This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 0cff6d276 feat(python): add TLS and auth args to getting-started
examples (#2754)
0cff6d276 is described below
commit 0cff6d276c65faf3f7703a2e8cef658ad21b6e0f
Author: saie-ch <[email protected]>
AuthorDate: Mon Mar 9 16:41:41 2026 +0530
feat(python): add TLS and auth args to getting-started examples (#2754)
closes #2809
---
.../actions/python-maturin/pre-merge/action.yml | 6 +-
examples/python/getting-started/consumer.py | 62 ++++++-
examples/python/getting-started/producer.py | 62 ++++++-
foreign/python/tests/conftest.py | 2 +-
foreign/python/tests/test_tls.py | 188 +++++++++++++++++++++
scripts/run-python-examples-from-readme.sh | 72 +++++++-
6 files changed, 373 insertions(+), 19 deletions(-)
diff --git a/.github/actions/python-maturin/pre-merge/action.yml
b/.github/actions/python-maturin/pre-merge/action.yml
index ee0d08bab..bb899f0d6 100644
--- a/.github/actions/python-maturin/pre-merge/action.yml
+++ b/.github/actions/python-maturin/pre-merge/action.yml
@@ -53,11 +53,7 @@ runs:
shell: bash
run: |
cd foreign/python
- if [ "${{ inputs.task }}" = "test" ]; then
- uv sync --frozen --extra dev --extra testing --extra testing-docker
- else
- uv sync --frozen --extra dev --extra testing
- fi
+ uv sync --frozen --extra dev --extra testing --extra testing-docker
- name: Lint and format check
if: inputs.task == 'lint'
diff --git a/examples/python/getting-started/consumer.py
b/examples/python/getting-started/consumer.py
old mode 100644
new mode 100755
index 30a78d5ba..2c23ec372
--- a/examples/python/getting-started/consumer.py
+++ b/examples/python/getting-started/consumer.py
@@ -32,7 +32,9 @@ TOPIC_ID = 0
PARTITION_ID = 0
BATCHES_LIMIT = 5
-ArgNamespace = namedtuple("ArgNamespace", ["tcp_server_address"])
+ArgNamespace = namedtuple(
+ "ArgNamespace", ["tcp_server_address", "tls", "tls_ca_file", "username",
"password"]
+)
class ValidateUrl(argparse.Action):
@@ -57,18 +59,66 @@ def parse_args():
action=ValidateUrl,
default="127.0.0.1:8090",
)
- return parser.parse_args()
+ parser.add_argument(
+ "--tls",
+ action="store_true",
+ default=False,
+ help="Enable TLS for TCP connection",
+ )
+ parser.add_argument(
+ "--tls-ca-file",
+ default="",
+ help="Path to TLS CA certificate file",
+ )
+ parser.add_argument(
+ "--username",
+ default="iggy",
+ help="Username for authentication",
+ )
+ parser.add_argument(
+ "--password",
+ default="iggy",
+ help="Password for authentication",
+ )
+ args = parser.parse_args()
+
+ # Validate TLS requirements
+ if args.tls and not args.tls_ca_file:
+ parser.error("--tls requires --tls-ca-file")
+
+ return ArgNamespace(**vars(args))
+
+
+def build_connection_string(args) -> str:
+ """Build a connection string with TLS support."""
+
+ conn_str =
f"iggy://{args.username}:{args.password}@{args.tcp_server_address}"
+
+ if args.tls:
+ # Extract domain from server address (host:port -> host)
+ host = args.tcp_server_address.split(":")[0]
+ query_params = ["tls=true", f"tls_domain={host}"]
+
+ # Add CA file if provided
+ if args.tls_ca_file:
+ query_params.append(f"tls_ca_file={args.tls_ca_file}")
+ conn_str += "?" + "&".join(query_params)
+
+ return conn_str
async def main():
args: ArgNamespace = parse_args()
- client = IggyClient(args.tcp_server_address)
+
+ # Build connection string with TLS support
+ connection_string = build_connection_string(args)
+ logger.info(f"Connection string: {connection_string}")
+
+ client = IggyClient.from_connection_string(connection_string)
try:
logger.info("Connecting to IggyClient...")
await client.connect()
- logger.info("Connected. Logging in user...")
- await client.login_user("iggy", "iggy")
- logger.info("Logged in.")
+ logger.info("Connected.")
await consume_messages(client)
except Exception as error:
logger.exception("Exception occurred in main function: {}", error)
diff --git a/examples/python/getting-started/producer.py
b/examples/python/getting-started/producer.py
old mode 100644
new mode 100755
index 36445bfac..94225a416
--- a/examples/python/getting-started/producer.py
+++ b/examples/python/getting-started/producer.py
@@ -33,7 +33,9 @@ TOPIC_ID = 0
PARTITION_ID = 0
BATCHES_LIMIT = 5
-ArgNamespace = namedtuple("ArgNamespace", ["tcp_server_address"])
+ArgNamespace = namedtuple(
+ "ArgNamespace", ["tcp_server_address", "tls", "tls_ca_file", "username",
"password"]
+)
class ValidateUrl(argparse.Action):
@@ -58,17 +60,65 @@ def parse_args():
action=ValidateUrl,
default="127.0.0.1:8090",
)
- return parser.parse_args()
+ parser.add_argument(
+ "--tls",
+ action="store_true",
+ default=False,
+ help="Enable TLS for TCP connection",
+ )
+ parser.add_argument(
+ "--tls-ca-file",
+ default="",
+ help="Path to TLS CA certificate file",
+ )
+ parser.add_argument(
+ "--username",
+ default="iggy",
+ help="Username for authentication",
+ )
+ parser.add_argument(
+ "--password",
+ default="iggy",
+ help="Password for authentication",
+ )
+ args = parser.parse_args()
+
+ # Validate TLS requirements
+ if args.tls and not args.tls_ca_file:
+ parser.error("--tls requires --tls-ca-file")
+
+ return ArgNamespace(**vars(args))
+
+
+def build_connection_string(args) -> str:
+ """Build a connection string with TLS support."""
+
+ conn_str =
f"iggy://{args.username}:{args.password}@{args.tcp_server_address}"
+
+ if args.tls:
+ # Extract domain from server address (host:port -> host)
+ host = args.tcp_server_address.split(":")[0]
+ query_params = ["tls=true", f"tls_domain={host}"]
+
+ # Add CA file if provided
+ if args.tls_ca_file:
+ query_params.append(f"tls_ca_file={args.tls_ca_file}")
+ conn_str += "?" + "&".join(query_params)
+
+ return conn_str
async def main():
args: ArgNamespace = parse_args()
- client = IggyClient(args.tcp_server_address)
+ # Build connection string with TLS support
+ connection_string = build_connection_string(args)
+ logger.info(f"Connection string: {connection_string}")
+ logger.info(f"Connecting to {args.tcp_server_address} (TLS: {args.tls})")
+
+ client = IggyClient.from_connection_string(connection_string)
logger.info("Connecting to IggyClient")
await client.connect()
- logger.info("Connected. Logging in user...")
- await client.login_user("iggy", "iggy")
- logger.info("Logged in.")
+ logger.info("Connected.")
await init_system(client)
await produce_messages(client)
diff --git a/foreign/python/tests/conftest.py b/foreign/python/tests/conftest.py
index 578d0bf29..8e2031b60 100644
--- a/foreign/python/tests/conftest.py
+++ b/foreign/python/tests/conftest.py
@@ -85,5 +85,5 @@ def pytest_collection_modifyitems(config, items):
"""Modify test collection to add markers automatically."""
for item in items:
# Mark all tests in test_iggy_sdk.py as integration tests
- if "test_iggy_sdk" in item.nodeid:
+ if "test_iggy_sdk" in item.nodeid or "test_tls" in item.nodeid:
item.add_marker(pytest.mark.integration)
diff --git a/foreign/python/tests/test_tls.py b/foreign/python/tests/test_tls.py
new file mode 100644
index 000000000..34c2e5c32
--- /dev/null
+++ b/foreign/python/tests/test_tls.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Integration tests for TLS connectivity using testcontainers.
+
+These tests spin up a TLS-enabled Iggy server in a Docker container
+so they are fully self-contained and work in CI without a pre-running
+TLS server.
+
+Requirements:
+ - Docker running locally
+ - testcontainers[docker] installed (in [testing-docker] extras)
+ - CA certificate available at core/certs/iggy_ca_cert.pem
+"""
+
+import asyncio
+import os
+import uuid
+
+import pytest
+from apache_iggy import IggyClient, PollingStrategy
+from apache_iggy import SendMessage as Message
+from testcontainers.core.container import DockerContainer # type:
ignore[import-untyped]
+from testcontainers.core.waiting_utils import wait_for_logs # type:
ignore[import-untyped]
+
+from .utils import wait_for_ping, wait_for_server
+
+# Paths resolved relative to this file → repo root
+REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..",
"..", ".."))
+CERTS_DIR = os.path.join(REPO_ROOT, "core", "certs")
+CA_FILE = os.path.join(CERTS_DIR, "iggy_ca_cert.pem")
+
+IGGY_IMAGE = os.environ.get("IGGY_SERVER_DOCKER_IMAGE", "apache/iggy:edge")
+CONTAINER_TCP_PORT = 8090
+
+
[email protected](scope="module")
+def tls_container():
+ """Start a TLS-enabled Iggy server in Docker."""
+ container = (
+ DockerContainer(IGGY_IMAGE)
+ .with_exposed_ports(CONTAINER_TCP_PORT)
+ .with_env("IGGY_ROOT_USERNAME", "iggy")
+ .with_env("IGGY_ROOT_PASSWORD", "iggy")
+ .with_env("IGGY_TCP_TLS_ENABLED", "true")
+ .with_env("IGGY_TCP_TLS_CERT_FILE", "/app/certs/iggy_cert.pem")
+ .with_env("IGGY_TCP_TLS_KEY_FILE", "/app/certs/iggy_key.pem")
+ .with_env("IGGY_TCP_ADDRESS", f"0.0.0.0:{CONTAINER_TCP_PORT}")
+ .with_volume_mapping(CERTS_DIR, "/app/certs", "ro")
+ .with_kwargs(privileged=True)
+ )
+ container.start()
+ # Wait for the server to be ready inside the container
+ wait_for_logs(container, "Iggy server is running", timeout=60)
+ yield container
+ container.stop()
+
+
[email protected](scope="module")
+async def tls_client(tls_container) -> IggyClient:
+ """Create an authenticated client connected to the TLS container."""
+ host = "localhost"
+ port = tls_container.get_exposed_port(CONTAINER_TCP_PORT)
+
+ wait_for_server(host, int(port))
+
+ conn_str = (
+ f"iggy+tcp://iggy:iggy@{host}:{port}"
+ f"?tls=true&tls_domain={host}&tls_ca_file={CA_FILE}"
+ )
+ client = IggyClient.from_connection_string(conn_str)
+ await client.connect()
+ await wait_for_ping(client)
+ return client
+
+
[email protected]
+class TestTlsConnectivity:
+ """Test TLS connection establishment and basic operations."""
+
+ @pytest.mark.asyncio
+ async def test_ping_over_tls(self, tls_client: IggyClient):
+ """Test that the server responds to ping over a TLS connection."""
+ await tls_client.ping()
+
+ @pytest.mark.asyncio
+ async def test_client_not_none(self, tls_client: IggyClient):
+ """Test that the TLS client fixture is properly initialized."""
+ assert tls_client is not None
+
+ @pytest.mark.asyncio
+ async def test_create_stream_over_tls(self, tls_client: IggyClient):
+ """Test creating and getting a stream over TLS."""
+ stream_name = f"tls-test-stream-{uuid.uuid4().hex[:8]}"
+ await tls_client.create_stream(stream_name)
+ stream = await tls_client.get_stream(stream_name)
+ assert stream is not None
+
+ @pytest.mark.asyncio
+ async def test_produce_and_consume_over_tls(self, tls_client: IggyClient):
+ """Test producing and consuming messages over TLS."""
+ stream_name = f"tls-msg-stream-{uuid.uuid4().hex[:8]}"
+ topic_name = "tls-test-topic"
+ partition_id = 0
+
+ # Create stream and topic
+ await tls_client.create_stream(stream_name)
+ await tls_client.create_topic(stream_name, topic_name,
partitions_count=1)
+
+ # Produce messages
+ test_messages = [f"tls-message-{i}" for i in range(3)]
+ messages = [Message(msg) for msg in test_messages]
+ await tls_client.send_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partitioning=partition_id,
+ messages=messages,
+ )
+
+ # Consume messages
+ polled = await tls_client.poll_messages(
+ stream=stream_name,
+ topic=topic_name,
+ partition_id=partition_id,
+ polling_strategy=PollingStrategy.First(),
+ count=10,
+ auto_commit=True,
+ )
+ assert len(polled) >= len(test_messages)
+
+ # Verify message payloads
+ for i, expected_msg in enumerate(test_messages):
+ if i < len(polled):
+ assert polled[i].payload().decode("utf-8") == expected_msg
+
+
[email protected]
+class TestTlsConnectionString:
+ """Test TLS connection string variations."""
+
+ @pytest.mark.asyncio
+ async def test_connection_string_with_tls_params(self, tls_container):
+ """Test creating a client from a connection string with TLS
parameters."""
+ host = "localhost"
+ port = tls_container.get_exposed_port(CONTAINER_TCP_PORT)
+
+ wait_for_server(host, int(port))
+
+ conn_str = (
+ f"iggy+tcp://iggy:iggy@{host}:{port}"
+ f"?tls=true&tls_domain={host}&tls_ca_file={CA_FILE}"
+ )
+ client = IggyClient.from_connection_string(conn_str)
+ await client.connect()
+ await wait_for_ping(client)
+ await client.ping()
+
+ @pytest.mark.asyncio
+ @pytest.mark.timeout(10)
+ async def test_connect_without_tls_should_fail(self, tls_container):
+ """Test that connecting without TLS to a TLS-enabled server fails."""
+ host = "localhost"
+ port = tls_container.get_exposed_port(CONTAINER_TCP_PORT)
+
+ wait_for_server(host, int(port))
+
+ # Use connection string without TLS params to a TLS-enabled server
+ # The SDK retries internally, so we use a timeout to detect failure
+ conn_str = f"iggy+tcp://iggy:iggy@{host}:{port}"
+ client = IggyClient.from_connection_string(conn_str)
+
+ with pytest.raises(Exception):
+ await asyncio.wait_for(client.connect(), timeout=5)
diff --git a/scripts/run-python-examples-from-readme.sh
b/scripts/run-python-examples-from-readme.sh
index bb601d733..08237c6e0 100755
--- a/scripts/run-python-examples-from-readme.sh
+++ b/scripts/run-python-examples-from-readme.sh
@@ -133,13 +133,83 @@ if [ -f "README.md" ]; then
done < <(grep -E "^uv run " "README.md")
fi
+# --- TLS Test Pass ---
+if [ "${exit_code}" -eq 0 ]; then
+ echo ""
+ echo -e "\e[36m=== Starting TLS test pass ===\e[0m"
+ echo ""
+
+ # Go back to repo root to manage server
+ cd ../..
+
+ # Stop non-TLS server
+ kill -TERM "$(cat ${PID_FILE})"
+ sleep 2
+
+ # Clean data and logs for fresh TLS start
+ rm -fr local_data
+ rm -f ${LOG_FILE}
+
+ # Start server with TLS enabled
+ echo "Starting server with TLS enabled..."
+ IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy IGGY_TCP_TLS_ENABLED=true
${SERVER_BIN} --fresh &>${LOG_FILE} &
+ echo $! >${PID_FILE}
+
+ # Wait for server to start
+ SERVER_START_TIME=0
+ while ! grep -q "has started" ${LOG_FILE}; do
+ if [ ${SERVER_START_TIME} -gt ${TIMEOUT} ]; then
+ echo "TLS server did not start within ${TIMEOUT} seconds."
+ ps fx
+ cat ${LOG_FILE}
+ exit 1
+ fi
+ echo "Waiting for Iggy TLS server to start... ${SERVER_START_TIME}"
+ sleep 1
+ ((SERVER_START_TIME += 1))
+ done
+
+ cd examples/python || exit 1
+ # shellcheck disable=SC1091
+ source .venv/bin/activate
+
+ # Run getting-started examples with TLS flags
+ # Use localhost instead of 127.0.0.1 to match the cert's SAN
(DNS:localhost)
+ TLS_CA_FILE="../../core/certs/iggy_ca_cert.pem"
+ TLS_ADDR="localhost:8090"
+
+ for cmd in \
+ "python getting-started/producer.py --tcp-server-address ${TLS_ADDR}
--tls --tls-ca-file ${TLS_CA_FILE}" \
+ "python getting-started/consumer.py --tcp-server-address ${TLS_ADDR}
--tls --tls-ca-file ${TLS_CA_FILE}"; do
+
+ echo -e "\e[33mChecking TLS example:\e[0m ${cmd}"
+ echo ""
+
+ set +e
+ eval "timeout 10 ${cmd}"
+ test_exit_code=$?
+ set -e
+
+ if [[ $test_exit_code -ne 0 && $test_exit_code -ne 124 ]]; then
+ echo ""
+ echo -e "\e[31mTLS example command failed:\e[0m ${cmd}"
+ echo ""
+ exit_code=$test_exit_code
+ break
+ fi
+ sleep 2
+ done
+
+fi
+
# Clean up virtual environment
+deactivate 2>/dev/null || true
rm -rf .venv
cd ../..
# Terminate server
-kill -TERM "$(cat ${PID_FILE})"
+kill -TERM "$(cat ${PID_FILE})" 2>/dev/null || true
test -e ${PID_FILE} && rm ${PID_FILE}
# If everything is ok remove log and pid files otherwise cat server log