This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 093d793e Python 5.0.6 (#1004)
093d793e is described below
commit 093d793ecaf5a72afeb80c1b1eeedb7eb5b3e9c5
Author: zhouli11 <[email protected]>
AuthorDate: Thu May 29 11:05:18 2025 +0800
Python 5.0.6 (#1004)
* Python 5.0.6
---
python/rocketmq/v5/client/balancer/queue_selector.py | 1 -
python/rocketmq/v5/client/client.py | 4 ++--
python/rocketmq/v5/client/metrics/client_metrics.py | 2 ++
python/rocketmq/v5/consumer/simple_consumer.py | 4 +++-
python/rocketmq/v5/producer/producer.py | 1 -
python/rocketmq/v5/util/misc.py | 2 +-
python/setup.py | 2 +-
7 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/python/rocketmq/v5/client/balancer/queue_selector.py
b/python/rocketmq/v5/client/balancer/queue_selector.py
index bd48f21c..98780a80 100644
--- a/python/rocketmq/v5/client/balancer/queue_selector.py
+++ b/python/rocketmq/v5/client/balancer/queue_selector.py
@@ -68,7 +68,6 @@ class QueueSelector:
def select_queue_by_hash_key(self, key):
hash_object = hashlib.sha256(key.encode('utf-8'))
hash_code = int.from_bytes(hash_object.digest(), byteorder='big')
- print(f"hashcode: {hash_code}")
return self.__message_queues[hash_code % len(self.__message_queues)]
def all_queues(self):
diff --git a/python/rocketmq/v5/client/client.py
b/python/rocketmq/v5/client/client.py
index d039a9a0..49bb6b3a 100644
--- a/python/rocketmq/v5/client/client.py
+++ b/python/rocketmq/v5/client/client.py
@@ -227,7 +227,6 @@ class Client:
asyncio.set_event_loop(self._rpc_channel_io_loop())
while True:
if self.__client_thread_task_enabled is True:
- self.__sync_setting_scheduler_threading_event.wait(5)
logger.debug(f"{self.__str__()} run update setting in
scheduler.")
try:
all_endpoints = self.__get_all_endpoints().values()
@@ -238,6 +237,7 @@ class Client:
logger.error(
f"{self.__str__()} scheduler set setting raise
exception: {e}"
)
+ self.__sync_setting_scheduler_threading_event.wait(300)
else:
break
logger.info(f"{self.__str__()} stop scheduler for update setting
success.")
@@ -273,7 +273,7 @@ class Client:
thread_name_prefix=f"client_callback_worker-{self.__client_id}")
logger.info(f"{self.__str__()} start callback executor success.
max_workers:{workers}")
except Exception as e:
- print(f"{self.__str__()} start async rpc callback raise exception:
{e}")
+ logger.error(f"{self.__str__()} start async rpc callback raise
exception: {e}")
raise e
@staticmethod
diff --git a/python/rocketmq/v5/client/metrics/client_metrics.py
b/python/rocketmq/v5/client/metrics/client_metrics.py
index 210631ee..22bef06f 100644
--- a/python/rocketmq/v5/client/metrics/client_metrics.py
+++ b/python/rocketmq/v5/client/metrics/client_metrics.py
@@ -28,6 +28,7 @@ from rocketmq.grpc_protocol import Metric
from rocketmq.v5.client.connection import RpcEndpoints
from rocketmq.v5.log import logger
from rocketmq.v5.model import HistogramEnum, MessageMetricType, MetricContext
+from rocketmq.v5.util import Signature
class ClientMetrics:
@@ -142,6 +143,7 @@ class ClientMetrics:
endpoint=self.__endpoints.__str__(),
insecure=True,
timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT,
+ headers=Signature.metadata(self.__client_configuration,
self.__client_id)
)
# create a metric reader and set the export interval
reader = PeriodicExportingMetricReader(
diff --git a/python/rocketmq/v5/consumer/simple_consumer.py
b/python/rocketmq/v5/consumer/simple_consumer.py
index 7a19c903..d00bb1a7 100644
--- a/python/rocketmq/v5/consumer/simple_consumer.py
+++ b/python/rocketmq/v5/consumer/simple_consumer.py
@@ -42,6 +42,7 @@ class SimpleConsumer(Client):
consumer_group,
subscription: dict = None,
await_duration=20,
+ tls_enable=False
):
if consumer_group is None or consumer_group.strip() == "":
raise IllegalArgumentException("consumerGroup should not be null")
@@ -56,6 +57,7 @@ class SimpleConsumer(Client):
client_configuration,
None if subscription is None else subscription.keys(),
ClientType.SIMPLE_CONSUMER,
+ tls_enable
)
self.__consumer_group = consumer_group
self.__await_duration = await_duration # long polling timeout, seconds
@@ -98,7 +100,7 @@ class SimpleConsumer(Client):
"unable to remove subscription because simple consumer is not
running"
)
- if topic in self.__subscriptions:
+ if self.__subscriptions.contains(topic):
self.__subscriptions.remove(topic)
self._remove_unused_topic_route_data(topic)
diff --git a/python/rocketmq/v5/producer/producer.py
b/python/rocketmq/v5/producer/producer.py
index 5072d90a..4f5f089f 100644
--- a/python/rocketmq/v5/producer/producer.py
+++ b/python/rocketmq/v5/producer/producer.py
@@ -303,7 +303,6 @@ class Producer(Client):
def __send(self, message: Message, topic_queue, attempt=1) -> SendReceipt:
req = self.__send_req(message)
send_context = self.client_metrics.send_before(message.topic)
- print(f"{topic_queue}")
send_message_future = self.rpc_client.send_message_async(
topic_queue.endpoints,
req,
diff --git a/python/rocketmq/v5/util/misc.py b/python/rocketmq/v5/util/misc.py
index e09faf99..79d6be34 100644
--- a/python/rocketmq/v5/util/misc.py
+++ b/python/rocketmq/v5/util/misc.py
@@ -29,7 +29,7 @@ class Misc:
__OS_NAME = None
TOPIC_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
CONSUMER_GROUP_PATTERN = compile(r"^[%a-zA-Z0-9_-]+$")
- SDK_VERSION = "5.0.5"
+ SDK_VERSION = "5.0.6"
@staticmethod
def sdk_language():
diff --git a/python/setup.py b/python/setup.py
index 75f95b68..02a125be 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -17,7 +17,7 @@ from setuptools import find_packages, setup
setup(
name='rocketmq-python-client',
- version='5.0.5',
+ version='5.0.6',
packages=find_packages(),
install_requires=[
"grpcio>=1.5.0",