This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 156784b5 Develop cpp (#762)
156784b5 is described below
commit 156784b540328076c4c43fee242306ac94977dbb
Author: Zhanhui Li <[email protected]>
AuthorDate: Wed Jun 26 19:07:14 2024 +0800
Develop cpp (#762)
* feat: add tagged-release github action pipeline
Signed-off-by: Zhanhui Li <[email protected]>
* Fix C++ SDK core dump issue (#2)
* fix: sync namespace from server Settings
* feat: use opentelemetry for tracing/metrics/logging
* Remove broken links and add targets to generate compile_commands.json
Signed-off-by: lizhanhui <[email protected]>
* fix: timer task may invoke a call to a destructing stream
Signed-off-by: Zhanhui Li <[email protected]>
* fix: update document as we have changed the way to generate
compile_commands.json
* fix: static_cast StreamState to std::uint8_t as enum class by default is
not formattable
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: lizhanhui <[email protected]>
Signed-off-by: Zhanhui Li <[email protected]>
Signed-off-by: Li Zhanhui <[email protected]>
* fix: fix stream state transition with gRPC reactor
Signed-off-by: Li Zhanhui <[email protected]>
* fix: revamp TelemetryBidiRecator
Signed-off-by: Li Zhanhui <[email protected]>
* feat: explicitly control exported symbols
Signed-off-by: Li Zhanhui <[email protected]>
* fix: release write hold when OnReadDone with ok=false and there is no
inflight write
Signed-off-by: Li Zhanhui <[email protected]>
* feat: revamp TelemetryBidiReactor states and their transition graph
Signed-off-by: Li Zhanhui <[email protected]>
* fix: example publish message QPS stats lambda
* Fifo opt (#732)
* Prepare to optimize FIFO publishing
Signed-off-by: Li Zhanhui <[email protected]>
* fix: SendReceipt now contains std::unique_ptr<Message> being sent
Signed-off-by: Li Zhanhui <[email protected]>
* fix: add doc explaining why we taking ownership of the message being sent
Signed-off-by: Li Zhanhui <[email protected]>
* feat: implement FifoProducerPartition
Signed-off-by: Li Zhanhui <[email protected]>
* feat: implement FifoProducerImpl
Signed-off-by: Li Zhanhui <[email protected]>
* feat: implement builder for FifoProducer
Signed-off-by: Li Zhanhui <[email protected]>
* fix: prepare to debug
Signed-off-by: Li Zhanhui <[email protected]>
* fix: log sending sending stages
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
* fix: spell errors
Signed-off-by: Li Zhanhui <[email protected]>
* fix: markdown code blocks should be fenced by blank lines
Signed-off-by: Li Zhanhui <[email protected]>
* fix: copyright header
Signed-off-by: Li Zhanhui <[email protected]>
* fix: copyright header
Signed-off-by: Li Zhanhui <[email protected]>
* fix: include statements
Signed-off-by: Zhanhui Li <[email protected]>
* feat: build example_fifo_producer in BUILD.bazel
Signed-off-by: Zhanhui Li <[email protected]>
* fix: #713 correct misuse of absl::make_optional
Signed-off-by: Zhanhui Li <[email protected]>
---------
Signed-off-by: Zhanhui Li <[email protected]>
Signed-off-by: lizhanhui <[email protected]>
Signed-off-by: Li Zhanhui <[email protected]>
---
.github/workflows/cpp_tagged_release.yaml | 57 ++++
.licenserc.yaml | 1 +
cpp/CMakeLists.txt | 4 +-
cpp/README.md | 23 +-
cpp/WORKSPACE | 26 +-
cpp/bazel/rocketmq_deps.bzl | 25 +-
cpp/examples/BUILD.bazel | 11 +
cpp/examples/CMakeLists.txt | 1 +
...oducerWithAsync.cpp => ExampleFifoProducer.cpp} | 71 ++--
cpp/examples/ExampleProducer.cpp | 7 +-
cpp/examples/ExampleProducerWithAsync.cpp | 8 +-
cpp/examples/ExampleProducerWithFifoMessage.cpp | 10 +-
cpp/examples/ExampleProducerWithTimedMessage.cpp | 7 +-
.../ExampleProducerWithTransactionalMessage.cpp | 7 +-
cpp/examples/ExamplePushConsumer.cpp | 4 +-
cpp/examples/ExampleSimpleConsumer.cpp | 4 +-
cpp/include/rocketmq/Configuration.h | 6 +-
cpp/include/rocketmq/FifoProducer.h | 68 ++++
cpp/include/rocketmq/Producer.h | 5 +-
cpp/include/rocketmq/SendReceipt.h | 7 +-
cpp/source/CMakeLists.txt | 3 +
cpp/source/base/Configuration.cpp | 4 +-
cpp/source/client/ClientManagerImpl.cpp | 152 +++++----
cpp/source/client/SessionImpl.cpp | 6 +-
cpp/source/client/TelemetryBidiReactor.cpp | 372 ++++++++++++++++-----
cpp/source/client/include/ClientManager.h | 10 +-
cpp/source/client/include/ClientManagerImpl.h | 14 +-
.../client/include/SendResult.h} | 14 +-
.../client/include/SendResultCallback.h} | 14 +-
cpp/source/client/include/TelemetryBidiReactor.h | 110 ++++--
cpp/source/exports.map | 6 +
.../rocketmq/FifoContext.cpp} | 21 +-
cpp/source/rocketmq/FifoProducer.cpp | 73 ++++
.../rocketmq/FifoProducerImpl.cpp} | 22 +-
cpp/source/rocketmq/FifoProducerPartition.cpp | 107 ++++++
cpp/source/rocketmq/Producer.cpp | 4 -
cpp/source/rocketmq/ProducerImpl.cpp | 51 +--
cpp/source/rocketmq/SendContext.cpp | 53 +--
cpp/source/rocketmq/include/ClientImpl.h | 4 +-
.../rocketmq/include/FifoContext.h} | 17 +-
cpp/source/rocketmq/include/FifoProducerImpl.h | 53 +++
.../rocketmq/include/FifoProducerPartition.h | 55 +++
cpp/source/rocketmq/include/ProducerImpl.h | 28 +-
cpp/source/rocketmq/include/SendContext.h | 10 +-
cpp/source/rocketmq/include/SimpleConsumerImpl.h | 2 +-
cpp/source/rocketmq/tests/BUILD.bazel | 8 +
.../rocketmq/tests/OptionalTest.cpp} | 21 +-
cpp/source/stats/MetricBidiReactor.cpp | 40 ++-
cpp/source/stats/include/MetricBidiReactor.h | 24 +-
...trouble_shooting.sh => gen_compile_commands.sh} | 7 +-
cpp/tools/trouble_shooting.sh | 0
51 files changed, 1221 insertions(+), 436 deletions(-)
diff --git a/.github/workflows/cpp_tagged_release.yaml
b/.github/workflows/cpp_tagged_release.yaml
new file mode 100644
index 00000000..416676c6
--- /dev/null
+++ b/.github/workflows/cpp_tagged_release.yaml
@@ -0,0 +1,57 @@
+---
+name: "cpp-tagged-release"
+
+on:
+ push:
+ tags:
+ - "cpp-*"
+
+jobs:
+ tagged-release:
+ name: "C++ Tagged Release"
+ runs-on: "ubuntu-latest"
+
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/checkout@v4
+ with:
+ repository: grpc/grpc
+ # The branch, tag or SHA to checkout. When checking out the
repository that
+ # triggered a workflow, this defaults to the reference or SHA for
that event.
+ # Otherwise, uses the default branch.
+ ref: 'v1.46.3'
+ # Relative path under $GITHUB_WORKSPACE to place the repository
+ path: cpp/repo/grpc
+ submodules: true
+ - name: "Install Dependencies"
+ run: |
+ sudo apt-get install -y build-essential autoconf libtool
pkg-config cmake git libprotobuf-dev libssl-dev zlib1g-dev libgflags-dev
+ - name: "Build gRPC"
+ working-directory: ./cpp/repo/grpc
+ run: |
+ mkdir _build && cd _build
+ cmake -DCMAKE_INSTALL_PREFIX=$HOME/grpc -DgRPC_SSL_PROVIDER=package
-DgRPC_ZLIB_PROVIDER=package -DgRPC_PROTOBUF_PACKAGE_TYPE=CONFIG
-DgRPC_ZLIB_PROVIDER=package ..
+ make
+ make install
+ - name: "Build Libraries"
+ working-directory: ./cpp
+ run: |
+ mkdir _build && cd _build
+ cmake ..
+ make
+ - name: "Package"
+ working-directory: ./cpp
+ run: |
+ mkdir -p dist/lib
+ mkdir -p dist/include
+ cp -r include/rocketmq dist/include/
+ cp _build/librocketmq.so dist/lib/
+ cp _build/librocketmq.a dist/lib/
+ tar -czvf dist.tar.gz dist
+ - uses: "marvinpinto/action-automatic-releases@latest"
+ with:
+ repo_token: "${{ secrets.GITHUB_TOKEN }}"
+ prerelease: false
+ automatic_release_tag: cpp
+ files: |
+ cpp/dist.tar.gz
\ No newline at end of file
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 4edcd18f..cb5c8747 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -48,6 +48,7 @@ header:
- 'cpp/.gitignore'
- 'cpp/third_party'
- 'cpp/cmake'
+ - 'cpp/source/exports.map'
- 'php/grpc/**/*.php'
- 'php/composer.json'
- 'rust/.cargo/Cargo.lock.min'
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 374466e6..42f7cd70 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -1,4 +1,4 @@
-cmake_minimum_required(VERSION 3.19)
+cmake_minimum_required(VERSION 3.16)
project(rocketmq)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
@@ -31,4 +31,4 @@ if (BUILD_EXAMPLES)
find_package(gflags REQUIRED)
find_package(ZLIB REQUIRED)
add_subdirectory(examples)
-endif ()
\ No newline at end of file
+endif ()
diff --git a/cpp/README.md b/cpp/README.md
index 117a2426..a6d983fb 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -217,19 +217,13 @@ if "com_google_googletest" not in native.existing_rules():
1. VSCode + Clangd
[Clangd](https://clangd.llvm.org/) is a really nice code completion tool.
Clangd requires compile_commands.json to work properly.
- To generate the file, we need clone another repository along with the
current one.
+ To generate the file, run the following command:
```sh
- git clone [email protected]:grailbio/bazel-compilation-database.git
+ ./tools/gen_compile_commands.sh
```
- From current repository root,
-
- ```sh
- ../bazel-compilation-database/generate.sh
- ```
-
- Once the script completes, you should have compile_commands.json file in
the repository root directory.
+ Once the script completes, you should have compile_commands.json file in
the workspace directory, aka, ${repository}/cpp.
LLVM project has an extension for
[clangd](https://marketplace.visualstudio.com/items?itemName=llvm-vs-code-extensions.vscode-clangd).
Please install it from the extension market.
@@ -239,8 +233,15 @@ if "com_google_googletest" not in native.existing_rules():
"C_Cpp.intelliSenseEngine": "Disabled",
"C_Cpp.autocomplete": "Disabled", // So you don't get autocomplete from
both extensions.
"C_Cpp.errorSquiggles": "Disabled", // So you don't get error squiggles
from both extensions (clangd's seem to be more reliable anyway).
- "clangd.path": "/Users/lizhanhui/usr/clangd_12.0.0/bin/clangd",
- "clangd.arguments": ["-log=verbose", "-pretty", "--background-index"],
+ "clangd.path": "/usr/bin/clangd",
+ "clangd.arguments": [
+ "-log=verbose",
+ "-pretty",
+ "--background-index",
+ "--header-insertion=never",
+ "--compile-commands-dir=${workspaceFolder}/",
+ "--query-driver=**"
+ ],
"clangd.onConfigChanged": "restart",
```
diff --git a/cpp/WORKSPACE b/cpp/WORKSPACE
index d09dd445..3c3d6476 100644
--- a/cpp/WORKSPACE
+++ b/cpp/WORKSPACE
@@ -27,4 +27,28 @@ http_archive(
load("@io_buildbuddy_buildbuddy_toolchain//:deps.bzl", "buildbuddy_deps")
buildbuddy_deps()
load("@io_buildbuddy_buildbuddy_toolchain//:rules.bzl", "buildbuddy")
-buildbuddy(name = "buildbuddy_toolchain")
\ No newline at end of file
+buildbuddy(name = "buildbuddy_toolchain")
+
+# Generate compile_commands.json
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+
+# Hedron's Compile Commands Extractor for Bazel
+# https://github.com/hedronvision/bazel-compile-commands-extractor
+http_archive(
+ name = "hedron_compile_commands",
+
+ # Replace the commit hash (0e990032f3c5a866e72615cf67e5ce22186dcb97) in
both places (below) with the latest
(https://github.com/hedronvision/bazel-compile-commands-extractor/commits/main),
rather than using the stale one here.
+ # Even better, set up Renovate and let it do the work for you (see
"Suggestion: Updates" in the README).
+ url =
"https://github.com/hedronvision/bazel-compile-commands-extractor/archive/204aa593e002cbd177d30f11f54cff3559110bb9.tar.gz",
+ strip_prefix =
"bazel-compile-commands-extractor-204aa593e002cbd177d30f11f54cff3559110bb9",
+ # When you first run this tool, it'll recommend a sha256 hash to put here
with a message like: "DEBUG: Rule 'hedron_compile_commands' indicated that a
canonical reproducible form can be obtained by modifying arguments sha256 = ..."
+)
+load("@hedron_compile_commands//:workspace_setup.bzl",
"hedron_compile_commands_setup")
+hedron_compile_commands_setup()
+load("@hedron_compile_commands//:workspace_setup_transitive.bzl",
"hedron_compile_commands_setup_transitive")
+hedron_compile_commands_setup_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive.bzl",
"hedron_compile_commands_setup_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive()
+load("@hedron_compile_commands//:workspace_setup_transitive_transitive_transitive.bzl",
"hedron_compile_commands_setup_transitive_transitive_transitive")
+hedron_compile_commands_setup_transitive_transitive_transitive()
\ No newline at end of file
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index eae31a6f..684e55eb 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -16,7 +16,6 @@ def rocketmq_deps():
sha256 =
"b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
strip_prefix = "googletest-release-1.11.0",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz",
"https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
],
)
@@ -27,7 +26,6 @@ def rocketmq_deps():
strip_prefix = "filesystem-1.5.0",
sha256 =
"eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz",
"https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz",
],
build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
@@ -39,7 +37,6 @@ def rocketmq_deps():
strip_prefix = "spdlog-1.9.2",
sha256 =
"6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz",
"https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz",
],
build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
@@ -51,7 +48,6 @@ def rocketmq_deps():
strip_prefix = "fmt-8.0.1",
sha256 =
"b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz",
"https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz",
],
build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
@@ -63,7 +59,6 @@ def rocketmq_deps():
sha256 =
"8b28fdd45bab62d15db232ec404248901842e5340299a57765e48abe8a80d930",
strip_prefix = "protobuf-3.20.1",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/protobuf/protobuf-3.20.1.tar.gz",
"https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.20.1.tar.gz",
],
)
@@ -74,7 +69,6 @@ def rocketmq_deps():
sha256 =
"507e38c8d95c7efa4f3b1c0595a8e8f139c885cb41a76cab7e20e4e67ae87731",
strip_prefix = "rules_proto_grpc-4.1.1",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto_grpc/rules_proto_grpc-4.1.1.tar.gz",
"https://github.com/rules-proto-grpc/rules_proto_grpc/archive/refs/tags/4.1.1.tar.gz",
],
)
@@ -84,7 +78,6 @@ def rocketmq_deps():
name = "io_opencensus_cpp",
sha256 =
"317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz",
"https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz",
],
strip_prefix = "opencensus-cpp-0.4.1",
@@ -96,7 +89,6 @@ def rocketmq_deps():
sha256 =
"dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4",
strip_prefix = "abseil-cpp-20211102.0",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/abseil/abseil-cpp-20211102.0.tar.gz",
"https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz",
],
)
@@ -107,7 +99,6 @@ def rocketmq_deps():
strip_prefix = "gflags-2.2.2",
sha256 =
"34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/gflags/gflags-2.2.2.tar.gz",
"https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz",
],
)
@@ -118,7 +109,6 @@ def rocketmq_deps():
strip_prefix = "grpc-1.46.3",
sha256 =
"d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz",
"https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz",
],
)
@@ -130,7 +120,6 @@ def rocketmq_deps():
build_file = "@org_apache_rocketmq//third_party:asio.BUILD",
strip_prefix = "asio-1.18.2",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz",
"https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz",
],
)
@@ -140,7 +129,6 @@ def rocketmq_deps():
name = "com_google_googleapis",
sha256 =
"e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip",
"https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip",
],
strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
@@ -152,7 +140,6 @@ def rocketmq_deps():
sha256 =
"cdf6b84084aad8f10bf20b46b77cb48d83c319ebe6458a18e9d2cebf57807cdd",
strip_prefix = "rules_python-0.8.1",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-python/rules_python-0.8.1.tar.gz",
"https://github.com/bazelbuild/rules_python/archive/refs/tags/0.8.1.tar.gz",
],
)
@@ -161,7 +148,6 @@ def rocketmq_deps():
http_archive,
name = "rules_swift",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_swift/rules_swift-0.27.0.tar.gz",
"https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz",
],
strip_prefix = "rules_swift-0.27.0",
@@ -172,7 +158,6 @@ def rocketmq_deps():
name = "io_bazel_rules_go",
sha256 =
"685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip",
],
@@ -184,7 +169,15 @@ def rocketmq_deps():
sha256 =
"e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d",
strip_prefix = "rules_proto-4.0.0-3.20.0",
urls = [
-
"https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz",
"https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz",
],
)
+
+ maybe(
+ http_archive,
+ name = "com_github_opentelemetry",
+ strip_prefix = "opentelemetry-cpp-1.14.2",
+ urls = [
+
"https://github.com/open-telemetry/opentelemetry-cpp/archive/refs/tags/v1.14.2.tar.gz"
+ ]
+ )
diff --git a/cpp/examples/BUILD.bazel b/cpp/examples/BUILD.bazel
index 5113e150..771b0e04 100644
--- a/cpp/examples/BUILD.bazel
+++ b/cpp/examples/BUILD.bazel
@@ -91,4 +91,15 @@ cc_binary(
"//source/rocketmq:rocketmq_library",
"@com_github_gflags_gflags//:gflags",
],
+)
+
+cc_binary(
+ name = "example_fifo_producer",
+ srcs = [
+ "ExampleFifoProducer.cpp",
+ ],
+ deps = [
+ "//source/rocketmq:rocketmq_library",
+ "@com_github_gflags_gflags//:gflags",
+ ]
)
\ No newline at end of file
diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8d6b0399..27304477 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -4,6 +4,7 @@ function(add_example name file)
endfunction()
add_example(example_producer ExampleProducer.cpp)
+add_example(example_fifo_producer ExampleFifoProducer.cpp)
add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
add_example(example_producer_with_fifo_message
ExampleProducerWithFifoMessage.cpp)
add_example(example_producer_with_timed_message
ExampleProducerWithTimedMessage.cpp)
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp
b/cpp/examples/ExampleFifoProducer.cpp
similarity index 73%
copy from cpp/examples/ExampleProducerWithAsync.cpp
copy to cpp/examples/ExampleFifoProducer.cpp
index 62ee7781..9d99be36 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -17,16 +17,19 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
-#include <cstdint>
#include <iostream>
-#include <mutex>
+#include <memory>
#include <random>
#include <string>
#include <system_error>
#include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/FifoProducer.h"
+#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
#include "rocketmq/Producer.h"
+#include "rocketmq/SendReceipt.h"
using namespace ROCKETMQ_NAMESPACE;
@@ -94,30 +97,33 @@ DEFINE_string(topic, "standard_topic_sample", "Topic to
which messages are publi
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
-DEFINE_uint32(concurrency, 128, "Concurrency of async send");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
+DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
auto& logger = getLogger();
- logger.setConsoleLevel(Level::Info);
- logger.setLevel(Level::Info);
+ logger.setConsoleLevel(Level::Debug);
+ logger.setLevel(Level::Debug);
logger.init();
+ // Access Key/Secret pair may be acquired from management console
CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
}
- // In most case, you don't need to create too many producers, singletion
pattern is recommended.
- auto producer = Producer::newBuilder()
+ // In most case, you don't need to create too many producers, singleton
pattern is recommended.
+ auto producer = FifoProducer::newBuilder()
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
+ .withConcurrency(FLAGS_concurrency)
.withTopics({FLAGS_topic})
.build();
@@ -127,8 +133,8 @@ int main(int argc, char* argv[]) {
auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
- while (count.compare_exchange_weak(cnt, 0)) {
- break;
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
@@ -145,25 +151,32 @@ int main(int argc, char* argv[]) {
std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency));
- auto send_callback = [&](const std::error_code& ec, const SendReceipt&
receipt) {
- std::unique_lock<std::mutex> lk(mtx);
- semaphore->release();
- completed++;
- count++;
- if (completed >= FLAGS_total) {
- cv.notify_all();
+ try {
+ for (std::size_t i = 0; i < FLAGS_total; ++i) {
+ auto message = Message::newBuilder()
+ .withTopic(FLAGS_topic)
+ .withTag("TagA")
+ .withKeys({"Key-" + std::to_string(i)})
+ .withGroup("message-group" + std::to_string(i %
FLAGS_concurrency))
+ .withBody(body)
+ .build();
+ std::error_code ec;
+ auto callback = [&](const std::error_code& ec, const SendReceipt&
receipt) mutable {
+ completed++;
+ count++;
+ semaphore->release();
+
+ if (completed >= FLAGS_total) {
+ cv.notify_all();
+ }
+ };
+
+ semaphore->acquire();
+ producer.send(std::move(message), callback);
+ std::cout << "Cached No." << i << " message" << std::endl;
}
- };
-
- for (std::size_t i = 0; i < FLAGS_total; ++i) {
- auto message = Message::newBuilder()
- .withTopic(FLAGS_topic)
- .withTag("TagA")
- .withKeys({"Key-" + std::to_string(i)})
- .withBody(body)
- .build();
- semaphore->acquire();
- producer.send(std::move(message), send_callback);
+ } catch (...) {
+ std::cerr << "Ah...No!!!" << std::endl;
}
{
@@ -178,4 +191,4 @@ int main(int argc, char* argv[]) {
}
return EXIT_SUCCESS;
-}
\ No newline at end of file
+}
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 57293c24..5e20cc12 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -57,6 +57,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -77,7 +78,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.withTopics({FLAGS_topic})
.build();
@@ -88,8 +89,8 @@ int main(int argc, char* argv[]) {
auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
- while (count.compare_exchange_weak(cnt, 0)) {
- break;
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp
b/cpp/examples/ExampleProducerWithAsync.cpp
index 62ee7781..d88dfc85 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -17,7 +17,6 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
-#include <cstdint>
#include <iostream>
#include <mutex>
#include <random>
@@ -97,6 +96,7 @@ DEFINE_uint32(total, 256, "Number of sample messages to
publish");
DEFINE_uint32(concurrency, 128, "Concurrency of async send");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -116,7 +116,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.withTopics({FLAGS_topic})
.build();
@@ -127,8 +127,8 @@ int main(int argc, char* argv[]) {
auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
- while (count.compare_exchange_weak(cnt, 0)) {
- break;
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index 09b8d407..4fa34f9d 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -74,7 +75,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.withTopics({FLAGS_topic})
.build();
@@ -83,10 +84,11 @@ int main(int argc, char* argv[]) {
std::atomic_long count(0);
auto stats_lambda = [&] {
+ std::cout << "Stats thread starts" << std::endl;
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
- while (count.compare_exchange_weak(cnt, 0)) {
- break;
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
@@ -109,7 +111,7 @@ int main(int argc, char* argv[]) {
.build();
std::error_code ec;
SendReceipt send_receipt = producer.send(std::move(message), ec);
- std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
+ // std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
count++;
}
} catch (...) {
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index 8f12f5b6..d6237459 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -56,6 +56,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -75,7 +76,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.withTopics({FLAGS_topic})
.build();
@@ -86,8 +87,8 @@ int main(int argc, char* argv[]) {
auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
- while (count.compare_exchange_weak(cnt, 0)) {
- break;
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index befb18ca..50620c5a 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -54,6 +54,7 @@ DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -79,7 +80,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.withTopics({FLAGS_topic})
.withTransactionChecker(checker)
@@ -91,8 +92,8 @@ int main(int argc, char* argv[]) {
auto stats_lambda = [&] {
while (!stopped.load(std::memory_order_relaxed)) {
long cnt = count.load(std::memory_order_relaxed);
- while (count.compare_exchange_weak(cnt, 0)) {
- break;
+ while (!count.compare_exchange_weak(cnt, 0)) {
+ cnt = count.load(std::memory_order_relaxed);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "QPS: " << cnt << std::endl;
diff --git a/cpp/examples/ExamplePushConsumer.cpp
b/cpp/examples/ExamplePushConsumer.cpp
index 1e20b2ee..66a85f4b 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -16,7 +16,6 @@
*/
#include <chrono>
#include <iostream>
-#include <mutex>
#include <thread>
#include "gflags/gflags.h"
@@ -30,6 +29,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service
access URL, provide
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -58,7 +58,7 @@ int main(int argc, char* argv[]) {
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.withConsumeThreads(4)
.withListener(listener)
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp
b/cpp/examples/ExampleSimpleConsumer.cpp
index 4c30214f..aedec71e 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -16,7 +16,6 @@
*/
#include <chrono>
#include <iostream>
-#include <thread>
#include "gflags/gflags.h"
#include "rocketmq/Logger.h"
@@ -29,6 +28,7 @@ DEFINE_string(access_point, "121.196.167.124:8081", "Service
access URL, provide
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
@@ -51,7 +51,7 @@ int main(int argc, char* argv[]) {
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withCredentialsProvider(credentials_provider)
- .withSsl(true)
+ .withSsl(FLAGS_tls)
.build())
.subscribe(FLAGS_topic, tag)
.build();
diff --git a/cpp/include/rocketmq/Configuration.h
b/cpp/include/rocketmq/Configuration.h
index 0037c270..6dcd4137 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -44,7 +44,7 @@ public:
}
bool withSsl() const {
- return withSsl_;
+ return tls_;
}
protected:
@@ -56,7 +56,7 @@ private:
std::string endpoints_;
CredentialsProviderPtr credentials_provider_;
std::chrono::milliseconds
request_timeout_{ConfigurationDefaults::RequestTimeout};
- bool withSsl_ = true;
+ bool tls_ = true;
};
class ConfigurationBuilder {
@@ -67,7 +67,7 @@ public:
ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds
request_timeout);
- ConfigurationBuilder& withSsl(bool enable);
+ ConfigurationBuilder& withSsl(bool with_ssl);
Configuration build();
diff --git a/cpp/include/rocketmq/FifoProducer.h
b/cpp/include/rocketmq/FifoProducer.h
new file mode 100644
index 00000000..1ee20d91
--- /dev/null
+++ b/cpp/include/rocketmq/FifoProducer.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "Configuration.h"
+#include "Message.h"
+#include "RocketMQ.h"
+#include "SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl;
+class FifoProducerBuilder;
+class ProducerImpl;
+
+class FifoProducer {
+public:
+ static FifoProducerBuilder newBuilder();
+
+ void send(MessageConstPtr message, SendCallback callback);
+
+private:
+ std::shared_ptr<FifoProducerImpl> impl_;
+
+ explicit FifoProducer(std::shared_ptr<FifoProducerImpl> impl) :
impl_(std::move(impl)) {
+ }
+
+ void start();
+
+ friend class FifoProducerBuilder;
+};
+
+class FifoProducerBuilder {
+public:
+ FifoProducerBuilder();
+
+ FifoProducerBuilder& withConfiguration(Configuration configuration);
+
+ FifoProducerBuilder& withTopics(const std::vector<std::string>& topics);
+
+ FifoProducerBuilder& withConcurrency(std::size_t concurrency);
+
+ FifoProducer build();
+
+private:
+ std::shared_ptr<FifoProducerImpl> impl_;
+ std::shared_ptr<ProducerImpl> producer_impl_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index 42004eb3..6b42843d 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -16,20 +16,17 @@
*/
#pragma once
-#include <chrono>
-#include <functional>
#include <memory>
#include <system_error>
#include <vector>
#include "Configuration.h"
-#include "ErrorCode.h"
-#include "Logger.h"
#include "Message.h"
#include "SendCallback.h"
#include "SendReceipt.h"
#include "Transaction.h"
#include "TransactionChecker.h"
+#include "rocketmq/Logger.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/include/rocketmq/SendReceipt.h
index 489df5ec..7eef6e79 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -16,20 +16,21 @@
*/
#pragma once
-#include <cstdint>
#include <string>
-#include <utility>
#include "RocketMQ.h"
+#include "rocketmq/Message.h"
ROCKETMQ_NAMESPACE_BEGIN
struct SendReceipt {
+ std::string target;
+
std::string message_id;
std::string transaction_id;
- std::string target;
+ MessageConstPtr message;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/CMakeLists.txt b/cpp/source/CMakeLists.txt
index d42c6a1a..dbf5ab01 100644
--- a/cpp/source/CMakeLists.txt
+++ b/cpp/source/CMakeLists.txt
@@ -53,7 +53,10 @@ target_link_libraries(rocketmq_shared
opencensus::stats
opencensus_proto
spdlog)
+set(VERSION_SCRIPT ${CMAKE_CURRENT_SOURCE_DIR}/exports.map)
+set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS}
-Wl,--version-script=${VERSION_SCRIPT}")
set_target_properties(rocketmq_shared
PROPERTIES
+ LINK_DEPENDS ${VERSION_SCRIPT}
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
LIBRARY_OUTPUT_NAME rocketmq)
\ No newline at end of file
diff --git a/cpp/source/base/Configuration.cpp
b/cpp/source/base/Configuration.cpp
index 2a136d5d..66cff2e8 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -38,8 +38,8 @@ ConfigurationBuilder&
ConfigurationBuilder::withRequestTimeout(std::chrono::mill
return *this;
}
-ConfigurationBuilder& ConfigurationBuilder::withSsl(bool enable) {
- configuration_.withSsl_ = enable;
+ConfigurationBuilder& ConfigurationBuilder::withSsl(bool with_ssl) {
+ configuration_.tls_ = with_ssl;
return *this;
}
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 5865dbb2..7d724c7b 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -24,34 +24,30 @@
#include <utility>
#include <vector>
-#include "apache/rocketmq/v2/definition.pb.h"
#include "InvocationContext.h"
#include "LogInterceptor.h"
#include "LogInterceptorFactory.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MessageExt.h"
-#include "MetadataConstants.h"
#include "MixAll.h"
#include "Protocol.h"
#include "ReceiveMessageContext.h"
#include "RpcClient.h"
#include "RpcClientImpl.h"
#include "Scheduler.h"
-#include "TlsHelper.h"
+#include "SchedulerImpl.h"
#include "UtilAll.h"
#include "google/protobuf/util/time_util.h"
#include "grpcpp/create_channel.h"
#include "rocketmq/ErrorCode.h"
-#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool
withSsl)
- : scheduler_(std::make_shared<SchedulerImpl>()),
resource_namespace_(std::move(resource_namespace)),
+ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool
with_ssl)
+ : scheduler_(std::make_shared<SchedulerImpl>()),
+ resource_namespace_(std::move(resource_namespace)),
state_(State::CREATED),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
- withSsl_(withSsl){
+ with_ssl_(with_ssl) {
certificate_verifier_ =
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
@@ -175,8 +171,10 @@ std::vector<std::string>
ClientManagerImpl::cleanOfflineRpcClients() {
return removed;
}
-void ClientManagerImpl::heartbeat(const std::string& target_host, const
Metadata& metadata,
- const HeartbeatRequest& request,
std::chrono::milliseconds timeout,
+void ClientManagerImpl::heartbeat(const std::string& target_host,
+ const Metadata& metadata,
+ const HeartbeatRequest& request,
+ std::chrono::milliseconds timeout,
const std::function<void(const
std::error_code&, const HeartbeatResponse&)>& cb) {
SPDLOG_DEBUG("Prepare to send heartbeat to {}. Request: {}", target_host,
request.DebugString());
auto client = getRpcClient(target_host, true);
@@ -279,8 +277,10 @@ void ClientManagerImpl::doHeartbeat() {
}
}
-bool ClientManagerImpl::send(const std::string& target_host, const Metadata&
metadata, SendMessageRequest& request,
- SendCallback cb) {
+bool ClientManagerImpl::send(const std::string& target_host,
+ const Metadata& metadata,
+ SendMessageRequest& request,
+ SendResultCallback cb) {
assert(cb);
SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}",
target_host, request.DebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
@@ -306,15 +306,14 @@ bool ClientManagerImpl::send(const std::string&
target_host, const Metadata& met
return;
}
- SendReceipt send_receipt = {};
- send_receipt.target = target_host;
- std::error_code ec;
+ SendResult send_result = {};
+ send_result.target = target_host;
if (!invocation_context->status.ok()) {
SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code:
{}, gRPC error message: {}",
invocation_context->remote_address,
invocation_context->status.error_code(),
invocation_context->status.error_message());
- ec = ErrorCode::RequestTimeout;
- cb(ec, send_receipt);
+ send_result.ec = ErrorCode::RequestTimeout;
+ cb(send_result);
return;
}
@@ -323,8 +322,8 @@ bool ClientManagerImpl::send(const std::string&
target_host, const Metadata& met
case rmq::Code::OK: {
if (!invocation_context->response.entries().empty()) {
auto first = invocation_context->response.entries().begin();
- send_receipt.message_id = first->message_id();
- send_receipt.transaction_id = first->transaction_id();
+ send_result.message_id = first->message_id();
+ send_result.transaction_id = first->transaction_id();
} else {
SPDLOG_ERROR("Unexpected send-message-response: {}",
invocation_context->response.DebugString());
}
@@ -333,126 +332,127 @@ bool ClientManagerImpl::send(const std::string&
target_host, const Metadata& met
case rmq::Code::ILLEGAL_TOPIC: {
SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::IllegalTopic;
+ send_result.ec = ErrorCode::IllegalTopic;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_TAG: {
SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageTag;
+ send_result.ec = ErrorCode::IllegalMessageTag;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_KEY: {
SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageKey;
+ send_result.ec = ErrorCode::IllegalMessageKey;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageGroup;
+ send_result.ec = ErrorCode::IllegalMessageGroup;
break;
}
case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::IllegalMessageProperty;
+ send_result.ec = ErrorCode::IllegalMessageProperty;
break;
}
case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}",
status.message(), invocation_context->remote_address);
- ec = ErrorCode::MessagePropertiesTooLarge;
+ send_result.ec = ErrorCode::MessagePropertiesTooLarge;
break;
}
case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::MessageBodyTooLarge;
+ send_result.ec = ErrorCode::MessageBodyTooLarge;
break;
}
case rmq::Code::TOPIC_NOT_FOUND: {
SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::TopicNotFound;
+ send_result.ec = ErrorCode::TopicNotFound;
break;
}
case rmq::Code::NOT_FOUND: {
SPDLOG_WARN("NotFound: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::NotFound;
+ send_result.ec = ErrorCode::NotFound;
break;
}
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::Unauthorized;
+ send_result.ec = ErrorCode::Unauthorized;
break;
}
-
+
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("Forbidden: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::Forbidden;
+ send_result.ec = ErrorCode::Forbidden;
break;
}
case rmq::Code::MESSAGE_CORRUPTED: {
SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::MessageCorrupted;
+ send_result.ec = ErrorCode::MessageCorrupted;
break;
}
case rmq::Code::TOO_MANY_REQUESTS: {
SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::TooManyRequests;
+ send_result.ec = ErrorCode::TooManyRequests;
break;
}
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::InternalServerError;
+ send_result.ec = ErrorCode::InternalServerError;
break;
}
case rmq::Code::HA_NOT_AVAILABLE: {
SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::InternalServerError;
+ send_result.ec = ErrorCode::InternalServerError;
break;
}
case rmq::Code::PROXY_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::GatewayTimeout;
+ send_result.ec = ErrorCode::GatewayTimeout;
break;
}
case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::GatewayTimeout;
+ send_result.ec = ErrorCode::GatewayTimeout;
break;
}
case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(),
invocation_context->remote_address);
- ec = ErrorCode::GatewayTimeout;
+ send_result.ec = ErrorCode::GatewayTimeout;
break;
}
case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
- SPDLOG_WARN("Message-property-conflict-with-type: Host={},
Response={}", invocation_context->remote_address,
invocation_context->response.DebugString());
- ec = ErrorCode::MessagePropertyConflictWithType;
+ SPDLOG_WARN("Message-property-conflict-with-type: Host={},
Response={}", invocation_context->remote_address,
+ invocation_context->response.DebugString());
+ send_result.ec = ErrorCode::MessagePropertyConflictWithType;
break;
}
default: {
SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest.
Host={}", invocation_context->remote_address);
- ec = ErrorCode::NotSupported;
+ send_result.ec = ErrorCode::NotSupported;
break;
}
}
- cb(ec, send_receipt);
+ cb(send_result);
};
invocation_context->callback = completion_callback;
@@ -470,7 +470,8 @@ std::shared_ptr<grpc::Channel>
ClientManagerImpl::createChannel(const std::strin
std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>>
interceptor_factories;
interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
- target_host, withSsl_ ? channel_credential_ :
grpc::InsecureChannelCredentials(), channel_arguments_,
std::move(interceptor_factories));
+ target_host, with_ssl_ ? channel_credential_ :
grpc::InsecureChannelCredentials(), channel_arguments_,
+ std::move(interceptor_factories));
return channel;
}
@@ -513,27 +514,28 @@ void ClientManagerImpl::cleanRpcClients() {
rpc_clients_.clear();
}
-SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue&
message_queue,
- const SendMessageResponse&
response, std::error_code& ec) {
- SendReceipt send_receipt;
+SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue&
message_queue,
+ const SendMessageResponse&
response,
+ std::error_code& ec) {
+ SendResult send_result;
switch (response.status().code()) {
case rmq::Code::OK: {
assert(response.entries_size() > 0);
- send_receipt.message_id = response.entries().begin()->message_id();
- send_receipt.transaction_id =
response.entries().begin()->transaction_id();
- return send_receipt;
+ send_result.message_id = response.entries().begin()->message_id();
+ send_result.transaction_id =
response.entries().begin()->transaction_id();
+ return send_result;
}
case rmq::Code::ILLEGAL_TOPIC: {
ec = ErrorCode::BadRequest;
- return send_receipt;
+ return send_result;
}
default: {
// TODO: handle other cases.
break;
}
}
- return send_receipt;
+ return send_result;
}
void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
@@ -541,8 +543,10 @@ void
ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) {
clients_.emplace_back(std::move(client));
}
-void ClientManagerImpl::resolveRoute(const std::string& target_host, const
Metadata& metadata,
- const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
+void ClientManagerImpl::resolveRoute(const std::string& target_host,
+ const Metadata& metadata,
+ const QueryRouteRequest& request,
+ std::chrono::milliseconds timeout,
const std::function<void(const
std::error_code&, const TopicRouteDataPtr&)>& cb) {
SPDLOG_DEBUG("Name server connection URL: {}", target_host);
SPDLOG_DEBUG("Query route request: {}", request.DebugString());
@@ -646,7 +650,9 @@ void ClientManagerImpl::resolveRoute(const std::string&
target_host, const Metad
}
void ClientManagerImpl::queryAssignment(
- const std::string& target, const Metadata& metadata, const
QueryAssignmentRequest& request,
+ const std::string& target,
+ const Metadata& metadata,
+ const QueryAssignmentRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const
QueryAssignmentResponse&)>& cb) {
SPDLOG_DEBUG("Prepare to send query assignment request to
broker[address={}]", target);
@@ -748,8 +754,10 @@ void ClientManagerImpl::queryAssignment(
client->asyncQueryAssignment(request, invocation_context);
}
-void ClientManagerImpl::receiveMessage(const std::string& target_host, const
Metadata& metadata,
- const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout,
+void ClientManagerImpl::receiveMessage(const std::string& target_host,
+ const Metadata& metadata,
+ const ReceiveMessageRequest& request,
+ std::chrono::milliseconds timeout,
ReceiveMessageCallback cb) {
SPDLOG_DEBUG("Prepare to receive message from {} asynchronously. Request:
{}", target_host, request.DebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
@@ -765,7 +773,6 @@ State ClientManagerImpl::state() const {
}
MessageConstSharedPtr ClientManagerImpl::wrapMessage(const rmq::Message& item)
{
- assert(item.topic().resource_namespace() == resource_namespace_);
auto builder = Message::newBuilder();
// base
@@ -955,8 +962,11 @@ SchedulerSharedPtr ClientManagerImpl::getScheduler() {
return scheduler_;
}
-void ClientManagerImpl::ack(const std::string& target, const Metadata&
metadata, const AckMessageRequest& request,
- std::chrono::milliseconds timeout, const
std::function<void(const std::error_code&)>& cb) {
+void ClientManagerImpl::ack(const std::string& target,
+ const Metadata& metadata,
+ const AckMessageRequest& request,
+ std::chrono::milliseconds timeout,
+ const std::function<void(const std::error_code&)>&
cb) {
std::string target_host(target.data(), target.length());
SPDLOG_DEBUG("Prepare to ack message against {} asynchronously.
AckMessageRequest: {}", target_host,
request.DebugString());
@@ -1066,8 +1076,11 @@ void ClientManagerImpl::ack(const std::string& target,
const Metadata& metadata,
}
void ClientManagerImpl::changeInvisibleDuration(
- const std::string& target_host, const Metadata& metadata, const
ChangeInvisibleDurationRequest& request,
- std::chrono::milliseconds timeout, const std::function<void(const
std::error_code&)>& completion_callback) {
+ const std::string& target_host,
+ const Metadata& metadata,
+ const ChangeInvisibleDurationRequest& request,
+ std::chrono::milliseconds timeout,
+ const std::function<void(const std::error_code&)>& completion_callback) {
RpcClientSharedPtr client = getRpcClient(target_host);
assert(client);
auto invocation_context = new
InvocationContext<ChangeInvisibleDurationResponse>();
@@ -1133,7 +1146,7 @@ void ClientManagerImpl::changeInvisibleDuration(
ec = ErrorCode::Forbidden;
break;
}
-
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}, host={}", status.message(),
invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
@@ -1159,7 +1172,9 @@ void ClientManagerImpl::changeInvisibleDuration(
}
void ClientManagerImpl::endTransaction(
- const std::string& target_host, const Metadata& metadata, const
EndTransactionRequest& request,
+ const std::string& target_host,
+ const Metadata& metadata,
+ const EndTransactionRequest& request,
std::chrono::milliseconds timeout,
const std::function<void(const std::error_code&, const
EndTransactionResponse&)>& cb) {
RpcClientSharedPtr client = getRpcClient(target_host);
@@ -1339,7 +1354,7 @@ void
ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
ec = ErrorCode::ServiceUnavailable;
break;
}
-
+
case rmq::Code::TOO_MANY_REQUESTS: {
ec = ErrorCode::TooManyRequests;
break;
@@ -1362,7 +1377,8 @@ void
ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
}
-std::error_code ClientManagerImpl::notifyClientTermination(const std::string&
target_host, const Metadata& metadata,
+std::error_code ClientManagerImpl::notifyClientTermination(const std::string&
target_host,
+ const Metadata&
metadata,
const
NotifyClientTerminationRequest& request,
std::chrono::milliseconds timeout) {
std::error_code ec;
@@ -1446,4 +1462,4 @@ void ClientManagerImpl::submit(std::function<void()>
task) {
const char* ClientManagerImpl::HEARTBEAT_TASK_NAME = "heartbeat-task";
const char* ClientManagerImpl::STATS_TASK_NAME = "stats-task";
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/client/SessionImpl.cpp
b/cpp/source/client/SessionImpl.cpp
index 0ca3fff2..36416829 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -16,12 +16,14 @@
*/
#include "SessionImpl.h"
+
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-SessionImpl::SessionImpl(std::weak_ptr<Client> client,
std::shared_ptr<RpcClient> rpc_client) : client_(client),
rpc_client_(rpc_client) {
+SessionImpl::SessionImpl(std::weak_ptr<Client> client,
std::shared_ptr<RpcClient> rpc_client)
+ : client_(client), rpc_client_(rpc_client) {
telemetry_ = rpc_client->asyncTelemetry(client_);
syncSettings();
}
@@ -39,8 +41,8 @@ void SessionImpl::syncSettings() {
}
SessionImpl::~SessionImpl() {
- SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
telemetry_->fireClose();
+ SPDLOG_DEBUG("Session for {} destructed", rpc_client_->remoteAddress());
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index 6c5b2f93..d75ac361 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -22,21 +22,24 @@
#include <utility>
#include "ClientManager.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
#include "MessageExt.h"
#include "Metadata.h"
#include "RpcClient.h"
#include "Signature.h"
#include "google/protobuf/util/time_util.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
rmq::MessagingService::Stub* stub,
std::string peer_address)
- : client_(client), peer_address_(std::move(peer_address)),
stream_state_(StreamState::Created) {
- auto ptr = client.lock();
+ : client_(client),
+ peer_address_(std::move(peer_address)),
+ read_state_(StreamState::Created),
+ write_state_(StreamState::Created) {
+ auto ptr = client_.lock();
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
context_.set_deadline(deadline);
Metadata metadata;
@@ -45,69 +48,126 @@
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
context_.AddMetadata(entry.first, entry.second);
}
stub->async()->Telemetry(&context_, this);
+ write_state_ = StreamState::Ready;
+ // Increase hold for write stream.
+ AddHold();
StartCall();
}
TelemetryBidiReactor::~TelemetryBidiReactor() {
- SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}",
peer_address_,
- static_cast<std::uint8_t>(stream_state_));
+ SPDLOG_INFO("Telemetry stream for {} destructed. ReadStreamState={},
WriteStreamState={}", peer_address_,
+ static_cast<std::uint8_t>(read_state_),
static_cast<std::uint8_t>(read_state_));
}
bool TelemetryBidiReactor::await() {
- absl::MutexLock lk(&server_setting_received_mtx_);
- if (server_setting_received_) {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Created != write_state_) {
return true;
}
- server_setting_received_cv_.Wait(&server_setting_received_mtx_);
- return server_setting_received_;
+ state_cv_.Wait(&state_mtx_);
+ return StreamState::Error != write_state_;
}
void TelemetryBidiReactor::OnWriteDone(bool ok) {
- SPDLOG_DEBUG("OnWriteDone: {}", ok);
-
- {
- bool expect = true;
- if (!command_inflight_.compare_exchange_strong(expect, false,
std::memory_order_relaxed)) {
- SPDLOG_WARN("Illegal command-inflight state");
- }
- }
+ SPDLOG_DEBUG("{}#OnWriteDone", peer_address_);
if (!ok) {
- SPDLOG_WARN("Failed to write telemetry command {} to {}",
write_.DebugString(), peer_address_);
+ RemoveHold();
{
- absl::MutexLock lk(&stream_state_mtx_);
- stream_state_ = StreamState::WriteDone;
- }
+ absl::MutexLock lk(&state_mtx_);
+ SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().DebugString(), peer_address_);
+ write_state_ = StreamState::Error;
+
+ // Sync read state.
+ switch (read_state_) {
+ case StreamState::Created:
+ case StreamState::Ready: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ break;
+ }
+ case StreamState::Inflight: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
+ read_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing:
+ case StreamState::Error:
+ case StreamState::Closed: {
+ break;
+ }
+ }
- fireClose();
+ state_cv_.SignalAll();
+ }
return;
+ } else {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Inflight == write_state_) {
+ write_state_ = StreamState::Ready;
+ }
}
+ // Check if the read stream has started.
+ fireRead();
+
+ // Remove the command that has been written to server.
{
- absl::MutexLock lk(&stream_state_mtx_);
- if (StreamState::Created == stream_state_) {
- stream_state_ = StreamState::Active;
- fireRead();
- }
+ absl::MutexLock lk(&writes_mtx_);
+ writes_.pop_front();
}
- fireWrite();
+ tryWriteNext();
}
void TelemetryBidiReactor::OnReadDone(bool ok) {
- SPDLOG_DEBUG("OnReadDone: ok={}", ok);
- if (!ok) {
- if (client_.lock()) {
- SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
- }
-
- {
- absl::MutexLock lk(&stream_state_mtx_);
- stream_state_ = StreamState::ReadDone;
+ SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
+ {
+ absl::MutexLock lk(&state_mtx_);
+ if (!ok) {
+ // Remove read hold.
+ RemoveHold();
+ {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Error));
+ read_state_ = StreamState::Error;
+ SPDLOG_WARN("Failed to read from telemetry stream from {}",
peer_address_);
+
+ // Sync write state
+ switch (write_state_) {
+ case StreamState::Created: {
+ // Not reachable
+ break;
+ }
+ case StreamState::Ready: {
+ write_state_ = StreamState::Closed;
+ // There is no inflight write request, remove write hold on its
behalf.
+ RemoveHold();
+ state_cv_.SignalAll();
+ break;
+ }
+ case StreamState::Inflight: {
+ write_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing:
+ case StreamState::Error:
+ case StreamState::Closed: {
+ break;
+ }
+ }
+ }
+ return;
+ } else if (StreamState::Closing == read_state_) {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ return;
}
- fireClose();
- return;
}
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_,
read_.DebugString());
@@ -122,13 +182,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_,
settings.DebugString());
applySettings(settings);
- {
- absl::MutexLock lk(&server_setting_received_mtx_);
- if (!server_setting_received_) {
- server_setting_received_ = true;
- server_setting_received_cv_.SignalAll();
- }
- }
break;
}
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
@@ -151,11 +204,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
TelemetryCommand response;
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace
is not supported");
- {
- absl::MutexLock lk(&writes_mtx_);
- writes_.push_back(response);
- }
- fireWrite();
+ write(std::move(response));
break;
}
@@ -188,7 +237,18 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
}
- fireRead();
+ {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Inflight == read_state_) {
+ SPDLOG_DEBUG("Spawn new read op, read-state={}",
static_cast<std::uint8_t>(read_state_));
+ StartRead(&read_);
+ } else if (read_state_ == StreamState::Closing) {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ }
+ }
}
void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
@@ -247,10 +307,33 @@ void TelemetryBidiReactor::applyBackoffPolicy(const
rmq::Settings& settings, std
}
void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings&
settings, std::shared_ptr<Client> client) {
+ // The server may have implicitly assumed a namespace for the client.
+ if (!settings.publishing().topics().empty()) {
+ for (const auto& topic : settings.publishing().topics()) {
+ if (topic.resource_namespace() != client->config().resource_namespace) {
+ SPDLOG_INFO("Client namespace is changed from [{}] to [{}]",
client->config().resource_namespace,
+ topic.resource_namespace());
+ client->config().resource_namespace = topic.resource_namespace();
+ break;
+ }
+ }
+ }
client->config().publisher.max_body_size =
settings.publishing().max_body_size();
}
void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings&
settings, std::shared_ptr<Client> client) {
+ // The server may have implicitly assumed a namespace for the client.
+ if (!settings.subscription().subscriptions().empty()) {
+ for (const auto& subscription : settings.subscription().subscriptions()) {
+ if (subscription.topic().resource_namespace() !=
client->config().resource_namespace) {
+ SPDLOG_INFO("Client namespace is changed from [{}] to [{}]",
client->config().resource_namespace,
+ subscription.topic().resource_namespace());
+ client->config().resource_namespace =
subscription.topic().resource_namespace();
+ break;
+ }
+ }
+ }
+
client->config().subscriber.fifo = settings.subscription().fifo();
auto polling_timeout =
google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout());
@@ -259,48 +342,144 @@ void TelemetryBidiReactor::applySubscriptionConfig(const
rmq::Settings& settings
}
void TelemetryBidiReactor::fireRead() {
- SPDLOG_DEBUG("{}#fireRead", peer_address_);
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Created != read_state_) {
+ SPDLOG_DEBUG("Further read from {} is not allowded due to
stream-state={}", peer_address_,
+ static_cast<std::uint8_t>(read_state_));
+ return;
+ }
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Ready));
+ read_state_ = StreamState::Ready;
+ AddHold();
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Inflight));
+ read_state_ = StreamState::Inflight;
StartRead(&read_);
}
void TelemetryBidiReactor::write(TelemetryCommand command) {
+ SPDLOG_DEBUG("{}#write", peer_address_);
+ {
+ absl::MutexLock lk(&state_mtx_);
+ // Reject incoming write commands if the stream state is closing or has
witnessed some error.
+ switch (write_state_) {
+ case StreamState::Closing:
+ case StreamState::Error:
+ case StreamState::Closed:
+ return;
+ default:
+ // no-op
+ break;
+ }
+ }
+
{
absl::MutexLock lk(&writes_mtx_);
writes_.push_back(command);
}
- fireWrite();
+ tryWriteNext();
}
-void TelemetryBidiReactor::fireWrite() {
- SPDLOG_DEBUG("{}#fireWrite", peer_address_);
+void TelemetryBidiReactor::tryWriteNext() {
+ SPDLOG_DEBUG("{}#tryWriteNext", peer_address_);
+ {
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Error == write_state_ || StreamState::Closed ==
write_state_) {
+ SPDLOG_WARN("Further write to {} is not allowded due to
stream-state={}", peer_address_,
+ static_cast<std::uint8_t>(write_state_));
+ return;
+ }
+ }
+
{
absl::MutexLock lk(&writes_mtx_);
- if (writes_.empty()) {
+ if (writes_.empty() && StreamState::Closing != write_state_) {
SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
return;
}
- bool expect = false;
- if (command_inflight_.compare_exchange_strong(expect, true,
std::memory_order_relaxed)) {
- write_ = std::move(*writes_.begin());
- writes_.erase(writes_.begin());
+ if (StreamState::Ready == write_state_) {
+ write_state_ = StreamState::Inflight;
+ }
+
+ if (writes_.empty()) {
+ // Tell server there is no more write requests.
+ StartWritesDone();
} else {
- SPDLOG_DEBUG("Another command is already on the wire. Peer={}",
peer_address_);
- return;
+ SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
+ StartWrite(&(writes_.front()));
}
}
- SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
write_.DebugString());
- StartWrite(&write_);
}
void TelemetryBidiReactor::fireClose() {
SPDLOG_INFO("{}#fireClose", peer_address_);
- if (StreamState::Active == stream_state_) {
- StartWritesDone();
- {
- absl::MutexLock lk(&stream_state_mtx_);
- if (StreamState::Active == stream_state_) {
- stream_state_cv_.Wait(&stream_state_mtx_);
+
+ {
+ // Acquire state lock
+ absl::MutexLock lk(&state_mtx_);
+
+ // Transition read state
+ switch (read_state_) {
+ case StreamState::Created:
+ case StreamState::Ready: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ state_cv_.SignalAll();
+ break;
+ }
+
+ case StreamState::Inflight: {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
+ read_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing: {
+ break;
+ }
+ case StreamState::Closed:
+ case StreamState::Error: {
+ state_cv_.SignalAll();
+ break;
+ }
+ }
+
+ // Transition write state
+ switch (write_state_) {
+ case StreamState::Created:
+ case StreamState::Ready:
+ case StreamState::Inflight: {
+ SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closing));
+ write_state_ = StreamState::Closing;
+ break;
+ }
+ case StreamState::Closing: {
+ break;
+ }
+ case StreamState::Closed:
+ case StreamState::Error: {
+ state_cv_.SignalAll();
+ break;
+ }
+ }
+ }
+
+ if (StreamState::Closing == write_state_) {
+ tryWriteNext();
+ }
+
+ {
+ // Acquire state lock
+ absl::MutexLock lk(&state_mtx_);
+ while ((StreamState::Closed != read_state_ && StreamState::Error !=
read_state_) ||
+ (StreamState::Closed != write_state_ && StreamState::Error !=
write_state_)) {
+ if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) {
+ SPDLOG_WARN("StreamState CondVar timed out before getting signalled:
read-state={}, write-state={}",
+ static_cast<uint8_t>(read_state_),
static_cast<uint8_t>(write_state_));
}
}
}
@@ -308,6 +487,20 @@ void TelemetryBidiReactor::fireClose() {
void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+ assert(StreamState::Closing == write_state_);
+
+ absl::MutexLock lk(&state_mtx_);
+ // Remove the hold for the write stream.
+ RemoveHold();
+
+ if (!ok) {
+ write_state_ = StreamState::Error;
+ SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
+ } else {
+ write_state_ = StreamState::Closed;
+ SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
+ }
+ state_cv_.SignalAll();
}
void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
@@ -315,7 +508,7 @@ void
TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
absl::MutexLock lk(&writes_mtx_);
writes_.emplace_back(command);
}
- fireWrite();
+ tryWriteNext();
}
/// Notifies the application that all operations associated with this RPC
@@ -334,9 +527,18 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
{
SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
- absl::MutexLock lk(&stream_state_mtx_);
- stream_state_ = StreamState::Closed;
- stream_state_cv_.SignalAll();
+ absl::MutexLock lk(&state_mtx_);
+ if (StreamState::Error != read_state_) {
+ SPDLOG_DEBUG("Change read-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ read_state_ = StreamState::Closed;
+ }
+ if (StreamState::Error != write_state_) {
+ SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Closed));
+ write_state_ = StreamState::Closed;
+ }
+ state_cv_.SignalAll();
}
auto client = client_.lock();
@@ -349,4 +551,20 @@ void TelemetryBidiReactor::OnDone(const grpc::Status&
status) {
}
}
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
+ SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_);
+
+ if (!ok) {
+ absl::MutexLock lk(&state_mtx_);
+ SPDLOG_DEBUG("Change write-state {} --> {}",
static_cast<std::uint8_t>(read_state_),
+ static_cast<std::uint8_t>(StreamState::Error));
+ read_state_ = StreamState::Error;
+ state_cv_.SignalAll();
+ SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
+ return;
+ }
+
+ SPDLOG_DEBUG("Received initial metadata from {}", peer_address_);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/client/include/ClientManager.h
b/cpp/source/client/include/ClientManager.h
index 56325fa4..02b232b2 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -22,14 +22,12 @@
#include <system_error>
#include "Client.h"
-#include "MessageExt.h"
#include "Metadata.h"
#include "ReceiveMessageCallback.h"
#include "RpcClient.h"
#include "Scheduler.h"
-#include "TelemetryBidiReactor.h"
+#include "SendResultCallback.h"
#include "TopicRouteData.h"
-#include "rocketmq/SendCallback.h"
#include "rocketmq/State.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -93,8 +91,10 @@ public:
virtual void receiveMessage(const std::string& target, const Metadata&
metadata, const ReceiveMessageRequest& request,
std::chrono::milliseconds timeout,
ReceiveMessageCallback callback) = 0;
- virtual bool send(const std::string& target_host, const Metadata& metadata,
SendMessageRequest& request,
- SendCallback cb) = 0;
+ virtual bool send(const std::string& target_host,
+ const Metadata& metadata,
+ SendMessageRequest& request,
+ SendResultCallback cb) = 0;
virtual std::error_code notifyClientTermination(const std::string&
target_host, const Metadata& metadata,
const
NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/ClientManagerImpl.h
b/cpp/source/client/include/ClientManagerImpl.h
index 653fcad3..5f1b27ca 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -20,7 +20,6 @@
#include <chrono>
#include <cstdint>
#include <functional>
-#include <future>
#include <memory>
#include <string>
#include <system_error>
@@ -29,18 +28,13 @@
#include "Client.h"
#include "ClientManager.h"
#include "InsecureCertificateVerifier.h"
-#include "InvocationContext.h"
#include "ReceiveMessageCallback.h"
#include "RpcClientImpl.h"
-#include "SchedulerImpl.h"
-#include "SendMessageContext.h"
-#include "TelemetryBidiReactor.h"
#include "ThreadPoolImpl.h"
#include "TopicRouteData.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "rocketmq/State.h"
@@ -54,7 +48,7 @@ public:
* effectively.
* @param resource_namespace Abstract resource namespace, in which this
client manager lives.
*/
- explicit ClientManagerImpl(std::string resource_namespace, bool withSsl =
true);
+ explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl =
true);
~ClientManagerImpl() override;
@@ -89,7 +83,7 @@ public:
bool send(const std::string& target_host,
const Metadata& metadata,
SendMessageRequest& request,
- SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
+ SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_);
/**
* Get a RpcClient according to the given target hosts, which follows scheme
specified
@@ -105,7 +99,7 @@ public:
RpcClientSharedPtr getRpcClient(const std::string& target_host, bool
need_heartbeat = true) override
LOCKS_EXCLUDED(rpc_clients_mtx_);
- static SendReceipt processSendResponse(const rmq::MessageQueue&
message_queue,
+ static SendResult processSendResponse(const rmq::MessageQueue& message_queue,
const SendMessageResponse& response,
std::error_code& ec);
@@ -242,7 +236,7 @@ private:
grpc::ChannelArguments channel_arguments_;
bool trace_{false};
- bool withSsl_;
+ bool with_ssl_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/source/client/include/SendResult.h
similarity index 88%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/client/include/SendResult.h
index 489df5ec..3596f61f 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/client/include/SendResult.h
@@ -16,20 +16,18 @@
*/
#pragma once
-#include <cstdint>
-#include <string>
-#include <utility>
+#include <system_error>
-#include "RocketMQ.h"
+#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string message_id;
+struct SendResult {
+ std::error_code ec;
+ std::string target;
+ std::string message_id;
std::string transaction_id;
-
- std::string target;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/source/client/include/SendResultCallback.h
similarity index 80%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/client/include/SendResultCallback.h
index 489df5ec..182f649a 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/client/include/SendResultCallback.h
@@ -16,20 +16,12 @@
*/
#pragma once
-#include <cstdint>
-#include <string>
-#include <utility>
+#include <functional>
-#include "RocketMQ.h"
+#include "SendResult.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string message_id;
-
- std::string transaction_id;
-
- std::string target;
-};
+using SendResultCallback = std::function<void(const SendResult&)>;
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/TelemetryBidiReactor.h
b/cpp/source/client/include/TelemetryBidiReactor.h
index 9fe65f31..3bdbe3d3 100644
--- a/cpp/source/client/include/TelemetryBidiReactor.h
+++ b/cpp/source/client/include/TelemetryBidiReactor.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
+#include <list>
#include <memory>
#include <utility>
#include <vector>
@@ -34,12 +35,29 @@ ROCKETMQ_NAMESPACE_BEGIN
enum class StreamState : std::uint8_t
{
Created = 0,
- Active = 1,
- ReadDone = 2,
- WriteDone = 3,
+ Ready = 1,
+ Inflight = 2,
+ Closing = 3,
Closed = 4,
+ Error = 5,
};
+/// write-stream-state: created --> ready --> inflight --> ready --> ...
+/// --> error
+/// --> closing --> closed
+/// --> closing --> closed
+/// --> error
+///
+///
+/// read-stream-state: created --> ready --> inflight --> inflight
+/// --> closing --> closed
+/// --> error
+/// --> closed
+/// requirement:
+/// 1, fireClose --> blocking await till bidireactor is closed;
+/// 2, when session is closed and client is still active, recreate a new
session to accept incoming commands from
+/// server 3, after writing the first Settings telemetry command, launch
the read directional stream
+///
class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand,
TelemetryCommand>,
public
std::enable_shared_from_this<TelemetryBidiReactor> {
public:
@@ -47,24 +65,54 @@ public:
~TelemetryBidiReactor();
- void OnWriteDone(bool ok) override;
-
- void OnWritesDoneDone(bool ok) override;
-
- void OnReadDone(bool ok) override;
-
+ /// Notifies the application that all operations associated with this RPC
+ /// have completed and all Holds have been removed. OnDone provides the RPC
+ /// status outcome for both successful and failed RPCs and will be called in
+ /// all cases. If it is not called, it indicates an application-level problem
+ /// (like failure to remove a hold).
+ ///
+ /// \param[in] s The status outcome of this RPC
void OnDone(const grpc::Status& status) override;
- void fireRead();
+ /// Notifies the application that a read of initial metadata from the
+ /// server is done. If the application chooses not to implement this method,
+ /// it can assume that the initial metadata has been read before the first
+ /// call of OnReadDone or OnDone.
+ ///
+ /// \param[in] ok Was the initial metadata read successfully? If false, no
+ /// new read/write operation will succeed, and any further
+ /// Start* operations should not be called.
+ void OnReadInitialMetadataDone(bool /*ok*/) override;
+
+ /// Notifies the application that a StartRead operation completed.
+ ///
+ /// \param[in] ok Was it successful? If false, no new read/write operation
+ /// will succeed, and any further Start* should not be called.
+ void OnReadDone(bool ok) override;
- void fireWrite();
+ /// Notifies the application that a StartWrite or StartWriteLast operation
+ /// completed.
+ ///
+ /// \param[in] ok Was it successful? If false, no new read/write operation
+ /// will succeed, and any further Start* should not be called.
+ void OnWriteDone(bool ok) override;
- void fireClose();
+ /// Notifies the application that a StartWritesDone operation completed. Note
+ /// that this is only used on explicit StartWritesDone operations and not for
+ /// those that are implicitly invoked as part of a StartWriteLast.
+ ///
+ /// \param[in] ok Was it successful? If false, the application will later see
+ /// the failure reflected as a bad status in OnDone and no
+ /// further Start* should be called.
+ void OnWritesDoneDone(bool ok) override;
+ /// Core API method to initiate this bidirectional stream.
void write(TelemetryCommand command);
bool await();
+ void fireClose();
+
private:
grpc::ClientContext context_;
@@ -75,17 +123,15 @@ private:
/**
* @brief Buffered commands to write to server
+ *
+ * TODO: move buffered commands to a shared container, which may survive
+ * multiple TelemetryBidiReactor lifecycles.
*/
- std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
+ std::list<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
absl::Mutex writes_mtx_;
/**
- * @brief The command that is currently being written back to server.
- */
- TelemetryCommand write_;
-
- /**
- * @brief Each TelemetryBidiReactor belongs to a specific client as its
owner.
+ * @brief Each TelemetryBidiReactor belongs to a specific client as its
owner.
*/
std::weak_ptr<Client> client_;
@@ -94,18 +140,12 @@ private:
*/
std::string peer_address_;
- /**
- * @brief Indicate if there is a command being written to network.
- */
- std::atomic_bool command_inflight_{false};
-
- StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
- absl::Mutex stream_state_mtx_;
- absl::CondVar stream_state_cv_;
+ StreamState read_state_ GUARDED_BY(state_mtx_);
+ StreamState write_state_ GUARDED_BY(state_mtx_);
+ absl::Mutex state_mtx_;
+ absl::CondVar state_cv_;
- bool server_setting_received_
GUARDED_BY(server_setting_received_mtx_){false};
- absl::Mutex server_setting_received_mtx_;
- absl::CondVar server_setting_received_cv_;
+ void changeStreamStateThenNotify(StreamState state);
void onVerifyMessageResult(TelemetryCommand command);
@@ -116,6 +156,14 @@ private:
void applyPublishingConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
void applySubscriptionConfig(const rmq::Settings& settings,
std::shared_ptr<Client> client);
+
+ /// Start the read stream.
+ ///
+ /// Once got the OnReadDone and status is OK, call StartRead immediately.
+ void fireRead();
+
+ /// Attempt to write pending telemetry command to server.
+ void tryWriteNext() LOCKS_EXCLUDED(state_mtx_, writes_mtx_);
};
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/exports.map b/cpp/source/exports.map
new file mode 100644
index 00000000..99ae5d1d
--- /dev/null
+++ b/cpp/source/exports.map
@@ -0,0 +1,6 @@
+{
+ global:
+ *rocketmq*;
+ local:
+ *;
+};
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/source/rocketmq/FifoContext.cpp
similarity index 70%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/FifoContext.cpp
index 489df5ec..1cf5bab2 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/FifoContext.cpp
@@ -14,22 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
+#include "FifoContext.h"
-#include <cstdint>
-#include <string>
-#include <utility>
-
-#include "RocketMQ.h"
+#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string message_id;
-
- std::string transaction_id;
+FifoContext::FifoContext(MessageConstPtr message, SendCallback callback)
+ : message(std::move(message)), callback(callback) {
+}
- std::string target;
-};
+FifoContext::FifoContext(FifoContext&& rhs) noexcept {
+ this->message = std::move(rhs.message);
+ this->callback = rhs.callback;
+}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducer.cpp
b/cpp/source/rocketmq/FifoProducer.cpp
new file mode 100644
index 00000000..b52630b0
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducer.cpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "rocketmq/FifoProducer.h"
+
+#include <cstddef>
+#include <memory>
+
+#include "FifoProducerImpl.h"
+#include "ProducerImpl.h"
+#include "StaticNameServerResolver.h"
+#include "rocketmq/Configuration.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+FifoProducerBuilder FifoProducer::newBuilder() {
+ return {};
+}
+
+FifoProducerBuilder::FifoProducerBuilder() :
producer_impl_(std::make_shared<ProducerImpl>()) {
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConfiguration(Configuration
configuration) {
+ auto name_server_resolver =
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
+ producer_impl_->withNameServerResolver(std::move(name_server_resolver));
+ producer_impl_->withCredentialsProvider(configuration.credentialsProvider());
+ producer_impl_->withRequestTimeout(configuration.requestTimeout());
+ producer_impl_->withSsl(configuration.withSsl());
+ return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withTopics(const
std::vector<std::string>& topics) {
+ producer_impl_->withTopics(topics);
+ return *this;
+}
+
+FifoProducerBuilder& FifoProducerBuilder::withConcurrency(std::size_t
concurrency) {
+ this->impl_ = std::make_shared<FifoProducerImpl>(producer_impl_,
concurrency);
+ return *this;
+}
+
+FifoProducer FifoProducerBuilder::build() {
+ FifoProducer fifo_producer(this->impl_);
+ fifo_producer.start();
+ return fifo_producer;
+}
+
+void FifoProducer::start() {
+ impl_->internalProducer()->start();
+}
+
+void FifoProducer::send(MessageConstPtr message, SendCallback callback) {
+ impl_->send(std::move(message), callback);
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/source/rocketmq/FifoProducerImpl.cpp
similarity index 64%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/FifoProducerImpl.cpp
index 489df5ec..ad08c4d9 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/FifoProducerImpl.cpp
@@ -14,22 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
+#include "FifoProducerImpl.h"
-#include <cstdint>
-#include <string>
#include <utility>
-#include "RocketMQ.h"
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string message_id;
+void FifoProducerImpl::send(MessageConstPtr message, SendCallback callback) {
+ auto& group = message->group();
+ std::size_t hash = hash_fn_(group);
+ std::size_t slot = hash % concurrency_;
- std::string transaction_id;
-
- std::string target;
-};
+ FifoContext context(std::move(message), callback);
+ partitions_[slot]->add(std::move(context));
+}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/FifoProducerPartition.cpp
b/cpp/source/rocketmq/FifoProducerPartition.cpp
new file mode 100644
index 00000000..8a2f06ff
--- /dev/null
+++ b/cpp/source/rocketmq/FifoProducerPartition.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "FifoProducerPartition.h"
+
+#include "absl/synchronization/mutex.h"
+
+#include <atomic>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void FifoProducerPartition::add(FifoContext&& context) {
+ {
+ absl::MutexLock lk(&messages_mtx_);
+ messages_.emplace_back(std::move(context));
+ SPDLOG_DEBUG("{} has {} pending messages after #add", name_,
messages_.size());
+ }
+
+ trySend();
+}
+
+void FifoProducerPartition::trySend() {
+ bool expected = false;
+ if (inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
+ absl::MutexLock lk(&messages_mtx_);
+
+ if (messages_.empty()) {
+ SPDLOG_DEBUG("There is no more messages to send");
+ return;
+ }
+
+ FifoContext& ctx = messages_.front();
+ MessageConstPtr message = std::move(ctx.message);
+ SendCallback send_callback = ctx.callback;
+
+ std::shared_ptr<FifoProducerPartition> partition = shared_from_this();
+ auto fifo_callback = [=](const std::error_code& ec, const SendReceipt&
receipt) mutable {
+ partition->onComplete(ec, receipt, send_callback);
+ };
+ SPDLOG_DEBUG("Sending FIFO message from {}", name_);
+ producer_->send(std::move(message), fifo_callback);
+ messages_.pop_front();
+ SPDLOG_DEBUG("In addition to the inflight one, there is {} messages
pending in {}", messages_.size(), name_);
+ } else {
+ SPDLOG_DEBUG("There is an inflight message");
+ }
+}
+
+void FifoProducerPartition::onComplete(const std::error_code& ec, const
SendReceipt& receipt, SendCallback& callback) {
+ if (ec) {
+ SPDLOG_INFO("{} completed with a failure: {}", name_, ec.message());
+ } else {
+ SPDLOG_DEBUG("{} completed OK", name_);
+ }
+
+ if (!ec) {
+ callback(ec, receipt);
+ // update inflight status
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
+ trySend();
+ } else {
+ SPDLOG_ERROR("{}: Unexpected inflight status", name_);
+ }
+ return;
+ }
+
+ // Put the message back to the front of the list
+ SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ FifoContext retry_context(std::move(receipt_mut.message), callback);
+ {
+ absl::MutexLock lk(&messages_mtx_);
+ messages_.emplace_front(std::move(retry_context));
+ }
+
+ // Update inflight status
+ bool expected = true;
+ if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
+ trySend();
+ } else {
+ SPDLOG_ERROR("Unexpected inflight status");
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 78d812ed..907d0a28 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -21,12 +21,8 @@
#include <system_error>
#include <utility>
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
-#include "MixAll.h"
#include "ProducerImpl.h"
#include "StaticNameServerResolver.h"
-#include "absl/strings/str_split.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "rocketmq/Transaction.h"
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 73130161..34c5b29c 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,38 +17,27 @@
#include "ProducerImpl.h"
#include <algorithm>
-#include <apache/rocketmq/v2/definition.pb.h>
-
#include <atomic>
#include <cassert>
#include <chrono>
-#include <limits>
#include <memory>
#include <system_error>
#include <utility>
-#include "Client.h"
-#include "MessageGroupQueueSelector.h"
-#include "MetadataConstants.h"
+#include "apache/rocketmq/v2/definition.pb.h"
#include "MixAll.h"
#include "Protocol.h"
#include "PublishInfoCallback.h"
-#include "RpcClient.h"
#include "SendContext.h"
-#include "SendMessageContext.h"
#include "Signature.h"
-#include "Tag.h"
#include "TracingUtility.h"
#include "TransactionImpl.h"
-#include "UniqueIdGenerator.h"
#include "UtilAll.h"
-#include "absl/strings/str_join.h"
#include "opencensus/trace/propagation/trace_context.h"
#include "opencensus/trace/span.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendReceipt.h"
-#include "rocketmq/Tracing.h"
#include "rocketmq/Transaction.h"
#include "rocketmq/TransactionChecker.h"
@@ -203,19 +192,30 @@ void ProducerImpl::wrapSendMessageRequest(const Message&
message, SendMessageReq
SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec)
noexcept {
ensureRunning(ec);
if (ec) {
- return {};
+ SPDLOG_WARN("Producer is not running");
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
auto topic_publish_info = getPublishInfo(message->topic());
if (!topic_publish_info) {
+ SPDLOG_WARN("Route of topic[{}] is not found", message->topic());
ec = ErrorCode::NotFound;
- return {};
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
std::vector<rmq::MessageQueue> message_queue_list;
- if
(!topic_publish_info->selectMessageQueues(absl::make_optional<std::string>(),
message_queue_list)) {
+ // null_opt_t
+ absl::optional<std::string> message_group{};
+ if (!topic_publish_info->selectMessageQueues(message_group,
message_queue_list)) {
+ SPDLOG_WARN("Failed to select an addressable message queue for topic[{}]",
message->topic());
ec = ErrorCode::NotFound;
- return {};
+ SendReceipt send_receipt{};
+ send_receipt.message = std::move(message);
+ return send_receipt;
}
auto mtx = std::make_shared<absl::Mutex>();
@@ -224,9 +224,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message,
std::error_code& ec) noe
SendReceipt send_receipt;
// Define callback
- auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) {
+ auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) mutable {
ec = code;
- send_receipt = receipt;
+ SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ send_receipt.message = std::move(receipt_mut.message);
{
absl::MutexLock lk(mtx.get());
completed = true;
@@ -251,6 +252,7 @@ void ProducerImpl::send(MessageConstPtr message,
SendCallback cb) {
ensureRunning(ec);
if (ec) {
SendReceipt send_receipt;
+ send_receipt.message = std::move(message);
cb(ec, send_receipt);
}
@@ -264,6 +266,7 @@ void ProducerImpl::send(MessageConstPtr message,
SendCallback cb) {
// No route entries of the given topic is available
if (ec) {
SendReceipt send_receipt;
+ send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
@@ -271,6 +274,7 @@ void ProducerImpl::send(MessageConstPtr message,
SendCallback cb) {
if (!publish_info) {
std::error_code ec = ErrorCode::NotFound;
SendReceipt send_receipt;
+ send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
@@ -280,6 +284,7 @@ void ProducerImpl::send(MessageConstPtr message,
SendCallback cb) {
if (!publish_info->selectMessageQueues(ptr->group(), message_queue_list)) {
std::error_code ec = ErrorCode::NotFound;
SendReceipt send_receipt;
+ send_receipt.message = std::move(ptr);
cb(ec, send_receipt);
return;
}
@@ -338,12 +343,12 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext>
context) {
Metadata metadata;
Signature::sign(client_config_, metadata);
- auto callback = [context](const std::error_code& ec, const SendReceipt&
send_receipt) {
- if (ec) {
- context->onFailure(ec);
+ auto callback = [context](const SendResult& send_result) {
+ if (send_result.ec) {
+ context->onFailure(send_result.ec);
return;
}
- context->onSuccess(send_receipt);
+ context->onSuccess(send_result);
};
client_manager_->send(target, metadata, request, callback);
@@ -354,12 +359,14 @@ void ProducerImpl::send0(MessageConstPtr message,
SendCallback callback, std::ve
std::error_code ec;
validate(*message, ec);
if (ec) {
+ send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
if (list.empty()) {
ec = ErrorCode::NotFound;
+ send_receipt.message = std::move(message);
callback(ec, send_receipt);
return;
}
diff --git a/cpp/source/rocketmq/SendContext.cpp
b/cpp/source/rocketmq/SendContext.cpp
index 385a1a99..bd97384d 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -21,41 +21,44 @@
#include "ProducerImpl.h"
#include "PublishStats.h"
#include "Tag.h"
-#include "TransactionImpl.h"
-#include "opencensus/trace/propagation/trace_context.h"
#include "opencensus/trace/span.h"
-#include "rocketmq/Logger.h"
+#include "rocketmq/ErrorCode.h"
#include "rocketmq/SendReceipt.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
+void SendContext::onSuccess(const SendResult& send_result) noexcept {
{
// Mark end of send-message span.
span_.SetStatus(opencensus::trace::StatusCode::OK);
span_.End();
}
- auto publisher = producer_.lock();
- if (!publisher) {
+ auto producer = producer_.lock();
+ if (!producer) {
+ SPDLOG_WARN("Producer has been destructed");
return;
}
// Collect metrics
{
auto duration = std::chrono::steady_clock::now() - request_time_;
- opencensus::stats::Record({{publisher->stats().latency(),
MixAll::millisecondsOf(duration)}},
+ opencensus::stats::Record({{producer->stats().latency(),
MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(),
publisher->config().client_id},
+ {Tag::clientIdTag(),
producer->config().client_id},
{Tag::invocationStatusTag(), "success"},
});
}
//
send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
- std::error_code ec;
- callback_(ec, send_receipt);
+ SendReceipt send_receipt = {};
+ send_receipt.target = send_result.target;
+ send_receipt.message_id = send_result.message_id;
+ send_receipt.transaction_id = send_result.transaction_id;
+ send_receipt.message = std::move(message_);
+ callback_(send_result.ec, send_receipt);
}
void SendContext::onFailure(const std::error_code& ec) noexcept {
@@ -65,38 +68,36 @@ void SendContext::onFailure(const std::error_code& ec)
noexcept {
span_.End();
}
- auto publisher = producer_.lock();
- if (!publisher) {
+ auto producer = producer_.lock();
+ if (!producer) {
+ SPDLOG_WARN("Producer has been destructed");
return;
}
// Collect metrics
{
auto duration = std::chrono::steady_clock::now() - request_time_;
- opencensus::stats::Record({{publisher->stats().latency(),
MixAll::millisecondsOf(duration)}},
+ opencensus::stats::Record({{producer->stats().latency(),
MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), message_->topic()},
- {Tag::clientIdTag(),
publisher->config().client_id},
+ {Tag::clientIdTag(),
producer->config().client_id},
{Tag::invocationStatusTag(), "failure"},
});
}
- if (++attempt_times_ >= publisher->maxAttemptTimes()) {
- SPDLOG_WARN("Retried {} times, which exceeds the limit: {}",
attempt_times_, publisher->maxAttemptTimes());
- callback_(ec, {});
- return;
- }
-
- std::shared_ptr<ProducerImpl> producer = producer_.lock();
- if (!producer) {
- SPDLOG_WARN("Producer has been destructed");
- callback_(ec, {});
+ if (++attempt_times_ >= producer->maxAttemptTimes()) {
+ SPDLOG_WARN("Retried {} times, which exceeds the limit: {}",
attempt_times_, producer->maxAttemptTimes());
+ SendReceipt receipt{};
+ receipt.message = std::move(message_);
+ callback_(ec, receipt);
return;
}
if (candidates_.empty()) {
SPDLOG_WARN("No alternative hosts to perform additional retries");
- callback_(ec, {});
+ SendReceipt receipt{};
+ receipt.message = std::move(message_);
+ callback_(ec, receipt);
return;
}
@@ -106,7 +107,7 @@ void SendContext::onFailure(const std::error_code& ec)
noexcept {
auto ctx = shared_from_this();
// If publish message requests are throttled, retry after backoff
if (ErrorCode::TooManyRequests == ec) {
- auto&& backoff = publisher->backoff(attempt_times_);
+ auto&& backoff = producer->backoff(attempt_times_);
SPDLOG_DEBUG("Publish message[topic={}, message-id={}] is throttled. Retry
after {}ms", message_->topic(),
message_->id(), MixAll::millisecondsOf(backoff));
auto retry_cb = [=]() { producer->sendImpl(ctx); };
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index c266047a..d7693962 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -94,8 +94,8 @@ public:
client_config_.request_timeout = absl::FromChrono(request_timeout);
}
- void withSsl(bool enable) {
- client_config_.withSsl = enable;
+ void withSsl(bool with_ssl) {
+ client_config_.withSsl = with_ssl;
}
/**
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/source/rocketmq/include/FifoContext.h
similarity index 74%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/include/FifoContext.h
index 489df5ec..2fc3492e 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/include/FifoContext.h
@@ -16,20 +16,19 @@
*/
#pragma once
-#include <cstdint>
-#include <string>
-#include <utility>
-
-#include "RocketMQ.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/RocketMQ.h"
+#include "rocketmq/SendCallback.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string message_id;
+struct FifoContext {
+ MessageConstPtr message;
+ SendCallback callback;
- std::string transaction_id;
+ FifoContext(MessageConstPtr message, SendCallback callback);
- std::string target;
+ FifoContext(FifoContext&& rhs) noexcept;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerImpl.h
b/cpp/source/rocketmq/include/FifoProducerImpl.h
new file mode 100644
index 00000000..ffc9c8a6
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerImpl.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "FifoProducerPartition.h"
+#include "ProducerImpl.h"
+#include "fmt/format.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/SendCallback.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerImpl : std::enable_shared_from_this<FifoProducerImpl> {
+public:
+ FifoProducerImpl(std::shared_ptr<ProducerImpl> producer, std::size_t
concurrency)
+ : producer_(producer), concurrency_(concurrency),
partitions_(concurrency) {
+ for (auto i = 0; i < concurrency; i++) {
+ partitions_[i] = std::make_shared<FifoProducerPartition>(producer_,
fmt::format("slot-{}", i));
+ }
+ };
+
+ void send(MessageConstPtr message, SendCallback callback);
+
+ std::shared_ptr<ProducerImpl>& internalProducer() {
+ return producer_;
+ }
+
+private:
+ std::shared_ptr<ProducerImpl> producer_;
+ std::vector<std::shared_ptr<FifoProducerPartition>> partitions_;
+ std::size_t concurrency_;
+ std::hash<std::string> hash_fn_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/include/FifoProducerPartition.h
b/cpp/source/rocketmq/include/FifoProducerPartition.h
new file mode 100644
index 00000000..8d0e00d8
--- /dev/null
+++ b/cpp/source/rocketmq/include/FifoProducerPartition.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "absl/base/internal/thread_annotations.h"
+
+#include <atomic>
+#include <list>
+#include <memory>
+#include <system_error>
+
+#include "FifoContext.h"
+#include "ProducerImpl.h"
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+#include "rocketmq/SendCallback.h"
+#include "rocketmq/SendReceipt.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class FifoProducerPartition : public
std::enable_shared_from_this<FifoProducerPartition> {
+public:
+ FifoProducerPartition(std::shared_ptr<ProducerImpl> producer, std::string&&
name)
+ : producer_(producer), name_(std::move(name)) {
+ }
+
+ void add(FifoContext&& context) LOCKS_EXCLUDED(messages_mtx_);
+
+ void trySend() LOCKS_EXCLUDED(messages_mtx_);
+
+ void onComplete(const std::error_code& ec, const SendReceipt& receipt,
SendCallback& callback);
+
+private:
+ std::shared_ptr<ProducerImpl> producer_;
+ std::list<FifoContext> messages_ GUARDED_BY(messages_mtx_);
+ absl::Mutex messages_mtx_;
+ std::atomic_bool inflight_{false};
+ std::string name_;
+};
+
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index d7260a93..b572f20d 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -16,28 +16,23 @@
*/
#pragma once
-#include <chrono>
#include <memory>
-#include <mutex>
#include <string>
#include <system_error>
#include "ClientImpl.h"
-#include "ClientManagerImpl.h"
#include "MixAll.h"
#include "PublishInfoCallback.h"
+#include "PublishStats.h"
#include "SendContext.h"
#include "TopicPublishInfo.h"
#include "TransactionImpl.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
-#include "absl/strings/string_view.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"
#include "rocketmq/SendReceipt.h"
-#include "rocketmq/State.h"
#include "rocketmq/TransactionChecker.h"
-#include "PublishStats.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -53,8 +48,22 @@ public:
void shutdown() override;
+ /**
+ * Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
+ * sent.
+ *
+ * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>, facilliating
+ * application to conduct customized retry policy.
+ */
SendReceipt send(MessageConstPtr message, std::error_code& ec) noexcept;
+ /**
+ * Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
+ * sent.
+ *
+ * Regardless of the send result, SendReceipt would have the
std::unique_ptr<const Message>, facilliating
+ * application to conduct customized retry policy.
+ */
void send(MessageConstPtr message, SendCallback callback);
void setTransactionChecker(TransactionChecker checker);
@@ -64,6 +73,13 @@ public:
return absl::make_unique<TransactionImpl>(producer);
}
+ /**
+ * Note we requrie application to transfer ownership of the message to send
to avoid concurrent modification during
+ * sent.
+ *
+ * TODO: Refine this API. Current API is not good enough as it cannot handle
the message back to its caller on publish
+ * failure.
+ */
void send(MessageConstPtr message, std::error_code& ec, Transaction&
transaction);
/**
diff --git a/cpp/source/rocketmq/include/SendContext.h
b/cpp/source/rocketmq/include/SendContext.h
index 4c05cebe..4067532b 100644
--- a/cpp/source/rocketmq/include/SendContext.h
+++ b/cpp/source/rocketmq/include/SendContext.h
@@ -19,16 +19,12 @@
#include <memory>
#include <system_error>
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-#include "opencensus/trace/span.h"
-
#include "Protocol.h"
+#include "SendResult.h"
#include "TransactionImpl.h"
-#include "rocketmq/ErrorCode.h"
+#include "opencensus/trace/span.h"
#include "rocketmq/Message.h"
#include "rocketmq/SendCallback.h"
-#include "rocketmq/SendReceipt.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -47,7 +43,7 @@ public:
span_(opencensus::trace::Span::BlankSpan()) {
}
- void onSuccess(const SendReceipt& send_receipt) noexcept;
+ void onSuccess(const SendResult& send_result) noexcept;
void onFailure(const std::error_code& ec) noexcept;
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 45aa61b9..7fc63b6d 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -25,7 +25,7 @@
using namespace std::chrono;
ROCKETMQ_NAMESPACE_BEGIN
-class SimpleConsumerImpl : public ClientImpl, public
std::enable_shared_from_this<SimpleConsumerImpl> {
+class SimpleConsumerImpl : virtual public ClientImpl, public
std::enable_shared_from_this<SimpleConsumerImpl> {
public:
SimpleConsumerImpl(std::string group);
diff --git a/cpp/source/rocketmq/tests/BUILD.bazel
b/cpp/source/rocketmq/tests/BUILD.bazel
index a8d10e92..74751c35 100644
--- a/cpp/source/rocketmq/tests/BUILD.bazel
+++ b/cpp/source/rocketmq/tests/BUILD.bazel
@@ -59,4 +59,12 @@ cc_test(
"ConsumeMessageServiceTest.cpp",
],
deps = base_deps,
+)
+
+cc_test(
+ name = "optional_test",
+ srcs = [
+ "OptionalTest.cpp",
+ ],
+ deps = base_deps
)
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h
b/cpp/source/rocketmq/tests/OptionalTest.cpp
similarity index 72%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/source/rocketmq/tests/OptionalTest.cpp
index 489df5ec..1266a73e 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/source/rocketmq/tests/OptionalTest.cpp
@@ -14,22 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
+#include "gtest/gtest.h"
+#include "absl/types/optional.h"
+#include "rocketmq/RocketMQ.h"
-#include <cstdint>
#include <string>
-#include <utility>
-
-#include "RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
-struct SendReceipt {
- std::string message_id;
+TEST(OptionalTest, test_optional) {
+ absl::optional<std::string> opt{};
+ ASSERT_EQ(false, opt.has_value());
- std::string transaction_id;
+ auto opt2 = absl::make_optional<std::string>();
+ ASSERT_EQ(true, opt2.has_value());
+}
- std::string target;
-};
+ROCKETMQ_NAMESPACE_END
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/stats/MetricBidiReactor.cpp
b/cpp/source/stats/MetricBidiReactor.cpp
index 22363bd0..e6921378 100644
--- a/cpp/source/stats/MetricBidiReactor.cpp
+++ b/cpp/source/stats/MetricBidiReactor.cpp
@@ -18,10 +18,10 @@
#include <chrono>
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
#include "OpencensusExporter.h"
#include "Signature.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -43,12 +43,15 @@ MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client>
client, std::weak_ptr
return;
}
exporter_ptr->stub()->async()->Export(&context_, this);
+ AddHold();
StartCall();
}
void MetricBidiReactor::OnReadDone(bool ok) {
if (!ok) {
SPDLOG_WARN("Failed to read response");
+ // match the AddHold() call in MetricBidiReactor::fireRead
+ RemoveHold();
return;
}
SPDLOG_DEBUG("OnReadDone OK");
@@ -56,16 +59,32 @@ void MetricBidiReactor::OnReadDone(bool ok) {
}
void MetricBidiReactor::OnWriteDone(bool ok) {
+ {
+ bool expected = true;
+ if (!inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
+ SPDLOG_WARN("Illegal command-inflight state");
+ return;
+ }
+ }
+
if (!ok) {
SPDLOG_WARN("Failed to report metrics");
+ // match AddHold() call in MetricBidiReactor::MetricBidiReactor
+ RemoveHold();
return;
}
SPDLOG_DEBUG("OnWriteDone OK");
+
+ // If the read stream has not started yet, start it now.
fireRead();
- bool expected = true;
- if (inflight_.compare_exchange_strong(expected, false,
std::memory_order_relaxed)) {
- fireWrite();
+
+ // Remove the one that been written.
+ {
+ absl::MutexLock lk(&requests_mtx_);
+ requests_.pop_front();
}
+
+ tryWriteNext();
}
void MetricBidiReactor::OnDone(const grpc::Status& s) {
@@ -89,13 +108,13 @@ void MetricBidiReactor::write(ExportMetricsServiceRequest
request) {
SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
{
absl::MutexLock lk(&requests_mtx_);
- requests_.emplace_back(std::move(request));
+ requests_.push_back(std::move(request));
}
- fireWrite();
+ tryWriteNext();
}
-void MetricBidiReactor::fireWrite() {
+void MetricBidiReactor::tryWriteNext() {
{
absl::MutexLock lk(&requests_mtx_);
if (requests_.empty()) {
@@ -107,16 +126,15 @@ void MetricBidiReactor::fireWrite() {
bool expected = false;
if (inflight_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
absl::MutexLock lk(&requests_mtx_);
- request_ = std::move(*requests_.begin());
- requests_.erase(requests_.begin());
SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
- StartWrite(&request_);
+ StartWrite(&(requests_.front()));
}
}
void MetricBidiReactor::fireRead() {
bool expected = false;
if (read_.compare_exchange_strong(expected, true,
std::memory_order_relaxed)) {
+ AddHold();
StartRead(&response_);
}
}
diff --git a/cpp/source/stats/include/MetricBidiReactor.h
b/cpp/source/stats/include/MetricBidiReactor.h
index 0a10cd88..e4d75344 100644
--- a/cpp/source/stats/include/MetricBidiReactor.h
+++ b/cpp/source/stats/include/MetricBidiReactor.h
@@ -16,6 +16,8 @@
*/
#pragma once
+#include <list>
+
#include "Client.h"
#include "grpcpp/grpcpp.h"
#include "grpcpp/impl/codegen/client_callback.h"
@@ -25,12 +27,17 @@ ROCKETMQ_NAMESPACE_BEGIN
class OpencensusExporter;
-using ExportMetricsServiceRequest =
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
-using ExportMetricsServiceResponse =
opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
+using ExportMetricsServiceRequest =
+ opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+using ExportMetricsServiceResponse =
+ opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
-class MetricBidiReactor : public
grpc::ClientBidiReactor<ExportMetricsServiceRequest,
ExportMetricsServiceResponse> {
+class MetricBidiReactor
+ : public grpc::ClientBidiReactor<ExportMetricsServiceRequest,
+ ExportMetricsServiceResponse> {
public:
- MetricBidiReactor(std::weak_ptr<Client> client,
std::weak_ptr<OpencensusExporter> exporter);
+ MetricBidiReactor(std::weak_ptr<Client> client,
+ std::weak_ptr<OpencensusExporter> exporter);
/// Notifies the application that a StartRead operation completed.
///
@@ -52,7 +59,7 @@ public:
/// (like failure to remove a hold).
///
/// \param[in] s The status outcome of this RPC
- void OnDone(const grpc::Status& /*s*/) override;
+ void OnDone(const grpc::Status & /*s*/) override;
void write(ExportMetricsServiceRequest request)
LOCKS_EXCLUDED(requests_mtx_);
@@ -61,9 +68,8 @@ private:
std::weak_ptr<OpencensusExporter> exporter_;
grpc::ClientContext context_;
- ExportMetricsServiceRequest request_;
-
- std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
+ /// Pending ExportMetricsServiceRequest items to write to server
+ std::list<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
absl::Mutex requests_mtx_;
std::atomic_bool inflight_{false};
@@ -71,7 +77,7 @@ private:
ExportMetricsServiceResponse response_;
- void fireWrite();
+ void tryWriteNext();
void fireRead();
};
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/gen_compile_commands.sh
old mode 100644
new mode 100755
similarity index 83%
copy from cpp/tools/trouble_shooting.sh
copy to cpp/tools/gen_compile_commands.sh
index 06d3795f..5ffc239c
--- a/cpp/tools/trouble_shooting.sh
+++ b/cpp/tools/gen_compile_commands.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,6 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+TOOLS_DIR=$(dirname "$0")
+WORKSPACE_DIR=$(dirname "$TOOLS_DIR")
+cd $WORKSPACE_DIR
-export GRPC_VERBOSITY=debug
-export GRPC_TRACE=tcp,http,api
\ No newline at end of file
+bazel run @hedron_compile_commands//:refresh_all
\ No newline at end of file
diff --git a/cpp/tools/trouble_shooting.sh b/cpp/tools/trouble_shooting.sh
old mode 100644
new mode 100755