This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-21905-poll-timeout in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3a6383ff60171553e33f2d78d06d4bbad8e3ae69 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Apr 1 11:42:29 2025 +0200 CAMEL-21905 - Camel-IBM-Secrets-Manager: Implement secret refresh by using IBM Event Streams as destination of notifications - Add poll timeout parameter Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- .../catalog/main/camel-main-configuration-metadata.json | 1 + .../manager/vault/IBMEventStreamReloadTriggerTask.java | 14 ++++++++++---- .../camel/vault/IBMSecretsManagerVaultConfiguration.java | 13 +++++++++++++ .../IBMSecretsManagerVaultConfigurationConfigurer.java | 6 ++++++ ...cretsManagerVaultConfigurationPropertiesConfigurer.java | 7 +++++++ .../META-INF/camel-main-configuration-metadata.json | 1 + core/camel-main/src/main/docs/main.adoc | 3 ++- 7 files changed, 40 insertions(+), 5 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json index a7b979a3db1..8db967e7b0c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/camel-main-configuration-metadata.json @@ -393,6 +393,7 @@ { "name": "camel.vault.hashicorp.scheme", "description": "Scheme to access hashicorp vault", "sourceType": "org.apache.camel.vault.HashicorpVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.hashicorp.token", "description": "Token to access hashicorp vault", "sourceType": "org.apache.camel.vault.HashicorpVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.ibm.eventStreamBootstrapServers", "description": "Specify the Bootstrap servers for consuming notification on IBM Event Stream. Multiple servers can be separated by comma.", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, + { "name": "camel.vault.ibm.eventStreamConsumerPollTimeout", "description": "Specify the Consumer Poll Timeout while consuming from IBM Event Stream Topic", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "integer", "javaType": "long", "defaultValue": 3000 }, { "name": "camel.vault.ibm.eventStreamGroupId", "description": "Specify the Consumer Group ID to access IBM Event Stream", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.ibm.eventStreamPassword", "description": "Specify the password to access IBM Event Stream", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.ibm.eventStreamTopic", "description": "Specify the topic name for consuming notification on IBM Event Stream", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, diff --git a/components/camel-ibm-secrets-manager/src/main/java/org/apache/camel/component/ibm/secrets/manager/vault/IBMEventStreamReloadTriggerTask.java b/components/camel-ibm-secrets-manager/src/main/java/org/apache/camel/component/ibm/secrets/manager/vault/IBMEventStreamReloadTriggerTask.java index 21b010c2626..a5ae92e6e3e 100644 --- a/components/camel-ibm-secrets-manager/src/main/java/org/apache/camel/component/ibm/secrets/manager/vault/IBMEventStreamReloadTriggerTask.java +++ b/components/camel-ibm-secrets-manager/src/main/java/org/apache/camel/component/ibm/secrets/manager/vault/IBMEventStreamReloadTriggerTask.java @@ -29,7 +29,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; -import com.ibm.cloud.secrets_manager_sdk.secrets_manager.v2.SecretsManager; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.RuntimeCamelException; @@ -65,11 +64,11 @@ public class IBMEventStreamReloadTriggerTask extends ServiceSupport implements C private static final String CAMEL_VAULT_IBM_EVENTSTREAM_PASSWORD_ENV = "CAMEL_VAULT_IBM_EVENTSTREAM_PASSWORD"; private static final String CAMEL_VAULT_IBM_EVENTSTREAM_CONSUMER_GROUPID_ENV = "CAMEL_VAULT_IBM_EVENTSTREAM_CONSUMER_GROUP_ID"; + private static final String CAMEL_VAULT_IBM_EVENTSTREAM_CONSUMER_POLL_TIMEOUT_ENV + = "CAMEL_VAULT_IBM_EVENTSTREAM_CONSUMER_POLL_TIMEOUT"; private CamelContext camelContext; - private SecretsManager client; - private String secretGroup; private boolean reloadEnabled = true; private String secrets; private IBMSecretsManagerPropertiesFunction propertiesFunction; @@ -79,6 +78,7 @@ public class IBMEventStreamReloadTriggerTask extends ServiceSupport implements C private final Map<String, Instant> updates = new HashMap<>(); KafkaConsumer<String, String> kafkaConsumer; private static final String IBM_SECRETS_MANAGER_SECRET_ROTATED_EVENT = "secret_rotated"; + protected long pollTimeout; private static final Logger LOG = LoggerFactory.getLogger(IBMEventStreamReloadTriggerTask.class); @@ -104,6 +104,12 @@ public class IBMEventStreamReloadTriggerTask extends ServiceSupport implements C String topic = System.getenv(CAMEL_VAULT_IBM_EVENTSTREAM_TOPIC_ENV); String username = System.getenv(CAMEL_VAULT_IBM_EVENTSTREAM_USERNAME_ENV); String password = System.getenv(CAMEL_VAULT_IBM_EVENTSTREAM_PASSWORD_ENV); + if (ObjectHelper.isNotEmpty(System.getenv(CAMEL_VAULT_IBM_EVENTSTREAM_CONSUMER_POLL_TIMEOUT_ENV))) { + pollTimeout = Long.parseLong(System.getenv(CAMEL_VAULT_IBM_EVENTSTREAM_CONSUMER_POLL_TIMEOUT_ENV)); + } else { + pollTimeout = getCamelContext().getVaultConfiguration().getIBMSecretsManagerVaultConfiguration() + .getEventStreamConsumerPollTimeout(); + } if (ObjectHelper.isEmpty(bootstrapServers) && ObjectHelper.isEmpty(groupId) && ObjectHelper.isEmpty(topic) && ObjectHelper.isEmpty(password)) { @@ -156,7 +162,7 @@ public class IBMEventStreamReloadTriggerTask extends ServiceSupport implements C ObjectMapper mapper = new ObjectMapper(); while (true) { - ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); + ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(pollTimeout)); for (ConsumerRecord<String, String> record : records) { JsonNode recordJson; diff --git a/core/camel-api/src/main/java/org/apache/camel/vault/IBMSecretsManagerVaultConfiguration.java b/core/camel-api/src/main/java/org/apache/camel/vault/IBMSecretsManagerVaultConfiguration.java index e1ef7fb199b..0a8cdcc30af 100644 --- a/core/camel-api/src/main/java/org/apache/camel/vault/IBMSecretsManagerVaultConfiguration.java +++ b/core/camel-api/src/main/java/org/apache/camel/vault/IBMSecretsManagerVaultConfiguration.java @@ -41,6 +41,8 @@ public class IBMSecretsManagerVaultConfiguration extends VaultConfiguration { private String eventStreamPassword; @Metadata private String eventStreamGroupId; + @Metadata(defaultValue = "3000") + private long eventStreamConsumerPollTimeout = 3000; public String getToken() { return token; @@ -141,4 +143,15 @@ public class IBMSecretsManagerVaultConfiguration extends VaultConfiguration { public void setEventStreamGroupId(String eventStreamGroupId) { this.eventStreamGroupId = eventStreamGroupId; } + + public long getEventStreamConsumerPollTimeout() { + return eventStreamConsumerPollTimeout; + } + + /** + * Specify the Consumer Poll Timeout while consuming from IBM Event Stream Topic + */ + public void setEventStreamConsumerPollTimeout(long eventStreamConsumerPollTimeout) { + this.eventStreamConsumerPollTimeout = eventStreamConsumerPollTimeout; + } } diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationConfigurer.java index 352ccf43d03..6cb28ea5990 100644 --- a/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationConfigurer.java +++ b/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationConfigurer.java @@ -29,6 +29,8 @@ public class IBMSecretsManagerVaultConfigurationConfigurer extends org.apache.ca case "azureVaultConfiguration": target.setAzureVaultConfiguration(property(camelContext, org.apache.camel.vault.AzureVaultConfiguration.class, value)); return true; case "eventstreambootstrapservers": case "eventStreamBootstrapServers": target.setEventStreamBootstrapServers(property(camelContext, java.lang.String.class, value)); return true; + case "eventstreamconsumerpolltimeout": + case "eventStreamConsumerPollTimeout": target.setEventStreamConsumerPollTimeout(property(camelContext, long.class, value)); return true; case "eventstreamgroupid": case "eventStreamGroupId": target.setEventStreamGroupId(property(camelContext, java.lang.String.class, value)); return true; case "eventstreampassword": @@ -66,6 +68,8 @@ public class IBMSecretsManagerVaultConfigurationConfigurer extends org.apache.ca case "azureVaultConfiguration": return org.apache.camel.vault.AzureVaultConfiguration.class; case "eventstreambootstrapservers": case "eventStreamBootstrapServers": return java.lang.String.class; + case "eventstreamconsumerpolltimeout": + case "eventStreamConsumerPollTimeout": return long.class; case "eventstreamgroupid": case "eventStreamGroupId": return java.lang.String.class; case "eventstreampassword": @@ -104,6 +108,8 @@ public class IBMSecretsManagerVaultConfigurationConfigurer extends org.apache.ca case "azureVaultConfiguration": return target.getAzureVaultConfiguration(); case "eventstreambootstrapservers": case "eventStreamBootstrapServers": return target.getEventStreamBootstrapServers(); + case "eventstreamconsumerpolltimeout": + case "eventStreamConsumerPollTimeout": return target.getEventStreamConsumerPollTimeout(); case "eventstreamgroupid": case "eventStreamGroupId": return target.getEventStreamGroupId(); case "eventstreampassword": diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationPropertiesConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationPropertiesConfigurer.java index 232bfd39def..a028422bcba 100644 --- a/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationPropertiesConfigurer.java +++ b/core/camel-main/src/generated/java/org/apache/camel/main/IBMSecretsManagerVaultConfigurationPropertiesConfigurer.java @@ -25,6 +25,7 @@ public class IBMSecretsManagerVaultConfigurationPropertiesConfigurer extends org map.put("AwsVaultConfiguration", org.apache.camel.vault.AwsVaultConfiguration.class); map.put("AzureVaultConfiguration", org.apache.camel.vault.AzureVaultConfiguration.class); map.put("EventStreamBootstrapServers", java.lang.String.class); + map.put("EventStreamConsumerPollTimeout", long.class); map.put("EventStreamGroupId", java.lang.String.class); map.put("EventStreamPassword", java.lang.String.class); map.put("EventStreamTopic", java.lang.String.class); @@ -51,6 +52,8 @@ public class IBMSecretsManagerVaultConfigurationPropertiesConfigurer extends org case "azureVaultConfiguration": target.setAzureVaultConfiguration(property(camelContext, org.apache.camel.vault.AzureVaultConfiguration.class, value)); return true; case "eventstreambootstrapservers": case "eventStreamBootstrapServers": target.setEventStreamBootstrapServers(property(camelContext, java.lang.String.class, value)); return true; + case "eventstreamconsumerpolltimeout": + case "eventStreamConsumerPollTimeout": target.setEventStreamConsumerPollTimeout(property(camelContext, long.class, value)); return true; case "eventstreamgroupid": case "eventStreamGroupId": target.setEventStreamGroupId(property(camelContext, java.lang.String.class, value)); return true; case "eventstreampassword": @@ -93,6 +96,8 @@ public class IBMSecretsManagerVaultConfigurationPropertiesConfigurer extends org case "azureVaultConfiguration": return org.apache.camel.vault.AzureVaultConfiguration.class; case "eventstreambootstrapservers": case "eventStreamBootstrapServers": return java.lang.String.class; + case "eventstreamconsumerpolltimeout": + case "eventStreamConsumerPollTimeout": return long.class; case "eventstreamgroupid": case "eventStreamGroupId": return java.lang.String.class; case "eventstreampassword": @@ -131,6 +136,8 @@ public class IBMSecretsManagerVaultConfigurationPropertiesConfigurer extends org case "azureVaultConfiguration": return target.getAzureVaultConfiguration(); case "eventstreambootstrapservers": case "eventStreamBootstrapServers": return target.getEventStreamBootstrapServers(); + case "eventstreamconsumerpolltimeout": + case "eventStreamConsumerPollTimeout": return target.getEventStreamConsumerPollTimeout(); case "eventstreamgroupid": case "eventStreamGroupId": return target.getEventStreamGroupId(); case "eventstreampassword": diff --git a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json index a7b979a3db1..8db967e7b0c 100644 --- a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json +++ b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json @@ -393,6 +393,7 @@ { "name": "camel.vault.hashicorp.scheme", "description": "Scheme to access hashicorp vault", "sourceType": "org.apache.camel.vault.HashicorpVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.hashicorp.token", "description": "Token to access hashicorp vault", "sourceType": "org.apache.camel.vault.HashicorpVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.ibm.eventStreamBootstrapServers", "description": "Specify the Bootstrap servers for consuming notification on IBM Event Stream. Multiple servers can be separated by comma.", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, + { "name": "camel.vault.ibm.eventStreamConsumerPollTimeout", "description": "Specify the Consumer Poll Timeout while consuming from IBM Event Stream Topic", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "integer", "javaType": "long", "defaultValue": 3000 }, { "name": "camel.vault.ibm.eventStreamGroupId", "description": "Specify the Consumer Group ID to access IBM Event Stream", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.ibm.eventStreamPassword", "description": "Specify the password to access IBM Event Stream", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, { "name": "camel.vault.ibm.eventStreamTopic", "description": "Specify the topic name for consuming notification on IBM Event Stream", "sourceType": "org.apache.camel.vault.IBMSecretsManagerVaultConfiguration", "type": "string", "javaType": "java.lang.String" }, diff --git a/core/camel-main/src/main/docs/main.adoc b/core/camel-main/src/main/docs/main.adoc index 44817f23212..e2f68315638 100644 --- a/core/camel-main/src/main/docs/main.adoc +++ b/core/camel-main/src/main/docs/main.adoc @@ -469,12 +469,13 @@ The camel.vault.hashicorp supports 6 options, which are listed below. === Camel IBM Secrets Manager Vault configurations -The camel.vault.ibm supports 9 options, which are listed below. +The camel.vault.ibm supports 10 options, which are listed below. [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type | *camel.vault.ibm.eventStream{zwsp}BootstrapServers* | Specify the Bootstrap servers for consuming notification on IBM Event Stream. Multiple servers can be separated by comma. | | String +| *camel.vault.ibm.eventStream{zwsp}ConsumerPollTimeout* | Specify the Consumer Poll Timeout while consuming from IBM Event Stream Topic | 3000 | long | *camel.vault.ibm.eventStream{zwsp}GroupId* | Specify the Consumer Group ID to access IBM Event Stream | | String | *camel.vault.ibm.eventStream{zwsp}Password* | Specify the password to access IBM Event Stream | | String | *camel.vault.ibm.eventStream{zwsp}Topic* | Specify the topic name for consuming notification on IBM Event Stream | | String