This is an automated email from the ASF dual-hosted git repository. marat pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 9dd129045b449e58c0fab1aa0308e128dc3a009d Author: Marat Gubaidullin <marat.gubaidul...@gmail.com> AuthorDate: Tue May 2 21:07:05 2023 -0400 SSE backend api for containers #563 --- .../camel/karavan/api/KubernetesResource.java | 8 +- .../apache/camel/karavan/api/LogWatchResource.java | 85 +++++++++++++++++++++ .../apache/camel/karavan/model/PipelineRunLog.java | 27 ------- .../camel/karavan/service/KubernetesService.java | 89 ++++++++++------------ .../src/main/webui/src/projects/ProjectLog.tsx | 6 +- 5 files changed, 130 insertions(+), 85 deletions(-) diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java index 60733747..fee4841c 100644 --- a/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java +++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/KubernetesResource.java @@ -29,13 +29,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import javax.inject.Inject; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.Comparator; diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java new file mode 100644 index 00000000..5fa14161 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/LogWatchResource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.karavan.api; + +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.smallrye.mutiny.tuples.Tuple2; +import org.apache.camel.karavan.service.KubernetesService; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.context.ManagedExecutor; +import org.jboss.logging.Logger; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseEventSink; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +@Path("/api/logwatch") +public class LogWatchResource { + + private static final Logger LOGGER = Logger.getLogger(LogWatchResource.class.getName()); + private static final ConcurrentHashMap<String, LogWatch> logWatches = new ConcurrentHashMap<>(); + + @Inject + KubernetesService kubernetesService; + + @ConfigProperty(name = "karavan.environment") + String environment; + + + @Inject + ManagedExecutor managedExecutor; + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @Path("/{type}/{env}/{name}") + public void eventSourcing(@PathParam("env") String env, + @PathParam("type") String type, + @PathParam("name") String name, + @Context SseEventSink eventSink, + @Context Sse sse + ) { + managedExecutor.execute(() -> { + try (SseEventSink sink = eventSink) { + LogWatch logWatch = kubernetesService.getLogWatch(name); + BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput())); + try { + for (String line; (line = reader.readLine()) != null && !sink.isClosed(); ) { + sink.send(sse.newEvent(line + System.lineSeparator())); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + if (sink.isClosed()) { + logWatch.close(); + LOGGER.info("LogWatch for " + name + " closed"); + } + } + }); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java b/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java deleted file mode 100644 index 307a12e5..00000000 --- a/karavan-app/src/main/java/org/apache/camel/karavan/model/PipelineRunLog.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.camel.karavan.model; - -public class PipelineRunLog { - private String task; - private String log; - - public PipelineRunLog(String task, String log) { - this.task = task; - this.log = log; - } - - public String getTask() { - return task; - } - - public void setTask(String task) { - this.task = task; - } - - public String getLog() { - return log; - } - - public void setLog(String log) { - this.log = log; - } -} diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java index 165e800b..13b4484c 100644 --- a/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java +++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/KubernetesService.java @@ -17,11 +17,7 @@ package org.apache.camel.karavan.service; import io.fabric8.knative.internal.pkg.apis.Condition; -import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.Secret; -import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.*; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; @@ -30,23 +26,14 @@ import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.openshift.api.model.ImageStream; import io.fabric8.openshift.client.OpenShiftClient; import io.fabric8.tekton.client.DefaultTektonClient; -import io.fabric8.tekton.pipeline.v1beta1.ParamBuilder; -import io.fabric8.tekton.pipeline.v1beta1.PipelineRef; -import io.fabric8.tekton.pipeline.v1beta1.PipelineRefBuilder; -import io.fabric8.tekton.pipeline.v1beta1.PipelineRun; -import io.fabric8.tekton.pipeline.v1beta1.PipelineRunBuilder; -import io.fabric8.tekton.pipeline.v1beta1.PipelineRunSpec; -import io.fabric8.tekton.pipeline.v1beta1.PipelineRunSpecBuilder; -import io.fabric8.tekton.pipeline.v1beta1.TaskRun; -import io.fabric8.tekton.pipeline.v1beta1.WorkspaceBindingBuilder; +import io.fabric8.tekton.pipeline.v1beta1.*; import io.quarkus.vertx.ConsumeEvent; import io.vertx.mutiny.core.eventbus.EventBus; -import org.apache.camel.karavan.informer.ServiceEventHandler; -import org.apache.camel.karavan.model.PipelineRunLog; -import org.apache.camel.karavan.model.Project; import org.apache.camel.karavan.informer.DeploymentEventHandler; import org.apache.camel.karavan.informer.PipelineRunEventHandler; import org.apache.camel.karavan.informer.PodEventHandler; +import org.apache.camel.karavan.informer.ServiceEventHandler; +import org.apache.camel.karavan.model.Project; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.health.HealthCheck; import org.eclipse.microprofile.health.HealthCheckResponse; @@ -57,13 +44,7 @@ import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.Default; import javax.enterprise.inject.Produces; import javax.inject.Inject; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; @@ -103,8 +84,7 @@ public class KubernetesService implements HealthCheck{ @ConfigProperty(name = "karavan.environment") public String environment; - - List<SharedIndexInformer> informers = new ArrayList<>(3); + List<SharedIndexInformer> informers = new ArrayList<>(4); @ConsumeEvent(value = START_INFORMERS, blocking = true) void startInformers(String data) { @@ -205,34 +185,49 @@ public class KubernetesService implements HealthCheck{ return logText; } - // TODO: implement log watch - public void startContainerLogWatch(String podName, String namespace) { - LogWatch logWatch = kubernetesClient().pods().inNamespace(namespace).withName(podName).watchLog(); - InputStream is = logWatch.getOutput(); - Integer i; - try { - while ((i = is.available()) != null) { - eventBus.publish(podName + "-" + namespace, new String(is.readNBytes(i))); - } - } catch (IOException e) { - LOGGER.error(e); - } - } - - public List<PipelineRunLog> getPipelineRunLog(String pipelineRuneName, String namespace) { - List<PipelineRunLog> result = new ArrayList<>(1); + public LogWatch getLogWatch(String podName) { + return kubernetesClient().pods().inNamespace(getNamespace()).withName(podName).watchLog(); + } +// public void startContainerLogWatch(String session, String podName) { +// Tuple2<CompletableFuture<Void>, LogWatch> old = logWatches.get(session); +// if (old != null) { +// LOGGER.info("Closing old"); +// old.getItem1().cancel(true); +// old.getItem2().close(); +// logWatches.remove(session); +// LOGGER.info("Closed old"); +// } +// +// LOGGER.info("Starting startContainerLogWatch"); +// CompletableFuture<Void> future = managedExecutor.runAsync(() -> { +// LogWatch logWatch = +// BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput())); +// try { +// for (String line; (line = reader.readLine()) != null; ) { +// eventBus.publish(session, System.lineSeparator()); +// eventBus.publish(session, line); +// System.out.println(line); +// } +// } catch (IOException e) { +// LOGGER.error(e.getMessage()); +// } +// }); +// logWatches.put(session, Tuple2.of(future, logWatch)); +// LOGGER.info("Done startContainerLogWatch"); +// } + + public String getPipelineRunLog(String pipelineRuneName, String namespace) { + StringBuilder result = new StringBuilder(); getTaskRuns(pipelineRuneName, namespace).forEach(taskRun -> { String podName = taskRun.getStatus().getPodName(); - StringBuilder log = new StringBuilder(); taskRun.getStatus().getSteps().forEach(stepState -> { String logText = kubernetesClient().pods().inNamespace(namespace).withName(podName).inContainer(stepState.getContainer()).getLog(true); - log.append(stepState.getContainer()).append(System.lineSeparator()); - log.append(logText).append(System.lineSeparator()); + result.append(stepState.getContainer()).append(System.lineSeparator()); + result.append(logText).append(System.lineSeparator()); }); - result.add(new PipelineRunLog(taskRun.getMetadata().getName(), log.toString())); }); - return result; + return result.toString(); } public PipelineRun getLastPipelineRun(String projectId, String pipelineName, String namespace) { diff --git a/karavan-app/src/main/webui/src/projects/ProjectLog.tsx b/karavan-app/src/main/webui/src/projects/ProjectLog.tsx index 3c706444..2b81dda5 100644 --- a/karavan-app/src/main/webui/src/projects/ProjectLog.tsx +++ b/karavan-app/src/main/webui/src/projects/ProjectLog.tsx @@ -31,7 +31,7 @@ export class ProjectLog extends React.Component<Props, State> { showLog: false, height: "30%", logViewerRef: React.createRef(), - isTextWrapped: false, + isTextWrapped: true, data: [] } @@ -40,7 +40,6 @@ export class ProjectLog extends React.Component<Props, State> { componentDidMount() { this.sub = ProjectEventBus.onShowLog()?.subscribe((log: ShowLogCommand) => { this.setState({showLog: true, log: log}); - console.log(log) this.showLogs(log.type, log.name, log.environment); }); } @@ -52,8 +51,7 @@ export class ProjectLog extends React.Component<Props, State> { showLogs = (type: 'container' | 'pipeline', name: string, environment: string) => { if (type === 'pipeline') { KaravanApi.getPipelineLog(environment, name, (res: any) => { - if (Array.isArray(res) && Array.from(res).length > 0) - this.setState({data: res.at(0).log}); + this.setState({data: res}); }); } else if (type === 'container') { KaravanApi.getContainerLog(environment, name, (res: any) => {