mmodzelewski commented on code in PR #2781:
URL: https://github.com/apache/iggy/pull/2781#discussion_r2894275099
##########
core/connectors/runtime/src/manager/sink.rs:
##########
@@ -96,6 +107,154 @@ impl SinkManager {
sink.info.last_error = Some(ConnectorError::new(error_message));
}
}
+
+ pub async fn stop_connector(
+ &self,
+ key: &str,
+ metrics: &Arc<Metrics>,
+ ) -> Result<(), RuntimeError> {
+ let details_arc = self
+ .sinks
+ .get(key)
+ .map(|e| e.value().clone())
+ .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+ let (shutdown_tx, task_handles, plugin_id, container) = {
+ let mut details = details_arc.lock().await;
+ (
+ details.shutdown_tx.take(),
+ std::mem::take(&mut details.task_handles),
+ details.info.id,
+ details.container.clone(),
+ )
+ };
+
+ if let Some(tx) = shutdown_tx {
+ let _ = tx.send(());
+ }
+
+ for handle in task_handles {
+ let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
+ }
+
+ if let Some(container) = &container {
+ info!("Closing sink connector with ID: {plugin_id} for plugin:
{key}");
+ (container.iggy_sink_close)(plugin_id);
+ info!("Closed sink connector with ID: {plugin_id} for plugin:
{key}");
+ }
+
+ {
+ let mut details = details_arc.lock().await;
+ let old_status = details.info.status;
+ details.info.status = ConnectorStatus::Stopped;
+ details.info.last_error = None;
+ if old_status == ConnectorStatus::Running {
+ metrics.decrement_sinks_running();
+ }
+ }
+
+ Ok(())
+ }
+
+ pub async fn start_connector(
+ &self,
+ key: &str,
+ config: &SinkConfig,
+ iggy_client: &IggyClient,
+ metrics: &Arc<Metrics>,
+ ) -> Result<(), RuntimeError> {
+ let details_arc = self
+ .sinks
+ .get(key)
+ .map(|e| e.value().clone())
+ .ok_or_else(|| RuntimeError::SinkNotFound(key.to_string()))?;
+
+ let container = {
+ let details = details_arc.lock().await;
+ details.container.clone().ok_or_else(|| {
+ RuntimeError::InvalidConfiguration(format!("No container
loaded for sink: {key}"))
+ })?
+ };
+
+ let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
+
+ sink::init_sink(
+ &container,
+ &config.plugin_config.clone().unwrap_or_default(),
+ plugin_id,
+ )?;
+ info!("Sink connector with ID: {plugin_id} for plugin: {key}
initialized successfully.");
+
+ let consumers = sink::setup_sink_consumers(key, config,
iggy_client).await?;
+
+ let (shutdown_tx, shutdown_rx) = watch::channel(());
+ let callback = container.iggy_sink_consume;
+ let verbose = config.verbose;
+ let mut task_handles = Vec::new();
+
+ for (consumer, decoder, batch_size, transforms) in consumers {
+ let plugin_key = key.to_string();
+ let metrics_clone = metrics.clone();
+ let shutdown_rx = shutdown_rx.clone();
+
+ let handle = tokio::spawn(async move {
+ if let Err(error) = sink::consume_messages(
+ plugin_id,
+ decoder,
+ batch_size,
+ callback,
+ transforms,
+ consumer,
+ verbose,
+ &plugin_key,
+ &metrics_clone,
+ shutdown_rx,
+ )
+ .await
+ {
+ error!(
+ "Failed to consume messages for sink connector with
ID: {plugin_id}: {error}"
+ );
+ }
+ });
+ task_handles.push(handle);
+ }
+
+ {
+ let mut details = details_arc.lock().await;
+ details.info.id = plugin_id;
+ details.info.status = ConnectorStatus::Running;
+ details.info.last_error = None;
+ details.config = config.clone();
+ details.shutdown_tx = Some(shutdown_tx);
+ details.task_handles = task_handles;
+ metrics.increment_sinks_running();
+ }
+
+ Ok(())
+ }
+
+ pub async fn restart_connector(
Review Comment:
Maybe we could do `try_lock` instead? Currently, the requests are getting
queued, which might not be desired. If the restart is already in progress, then
we just return OK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]