This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8c149  chore(log): make dev-mode work in Knative
6a8c149 is described below

commit 6a8c149d7803e8a348002e1597ddf053493ff63a
Author: nferraro <ni.ferr...@gmail.com>
AuthorDate: Tue Dec 11 17:55:04 2018 +0100

    chore(log): make dev-mode work in Knative
---
 pkg/util/log/pod_scraper.go | 54 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 39 insertions(+), 15 deletions(-)

diff --git a/pkg/util/log/pod_scraper.go b/pkg/util/log/pod_scraper.go
index ba4567b..c46f2be 100644
--- a/pkg/util/log/pod_scraper.go
+++ b/pkg/util/log/pod_scraper.go
@@ -34,6 +34,11 @@ import (
        "k8s.io/apimachinery/pkg/watch"
 )
 
+var commonUserContainerNames = map[string]bool{
+       // Convention used in Knative and Istio
+       "user-container": true,
+}
+
 // PodScraper scrapes logs of a specific pod
 type PodScraper struct {
        namespace string
@@ -62,13 +67,16 @@ func (s *PodScraper) Start(ctx context.Context) 
*bufio.Reader {
 }
 
 func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, 
clientCloser func() error) {
-       err := s.waitForPodRunning(ctx, s.namespace, s.name)
+       containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
        if err != nil {
                s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
                return
        }
-
-       byteReader, err := 
k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, 
&v1.PodLogOptions{Follow: true}).Context(ctx).Stream()
+       logOptions := v1.PodLogOptions{
+               Follow:    true,
+               Container: containerName,
+       }
+       byteReader, err := 
k8sclient.GetKubeClient().CoreV1().Pods(s.namespace).GetLogs(s.name, 
&logOptions).Context(ctx).Stream()
        if err != nil {
                s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
                return
@@ -100,14 +108,14 @@ func (s *PodScraper) handleAndRestart(ctx 
context.Context, err error, wait time.
        }
 
        if ctx.Err() != nil {
-               logrus.Info("Pod ", s.name, " will no longer be monitored")
+               logrus.Debug("Pod ", s.name, " will no longer be monitored")
                if err := clientCloser(); err != nil {
                        logrus.Warn("Unable to close the client", err)
                }
                return
        }
 
-       logrus.Info("Retrying to scrape pod ", s.name, " logs in ", 
wait.Seconds(), " seconds...")
+       logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", 
wait.Seconds(), " seconds...")
        select {
        case <-time.After(wait):
                break
@@ -121,8 +129,9 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, 
err error, wait time.
        s.doScrape(ctx, out, clientCloser)
 }
 
-// Waits for a given pod to reach the running state
-func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, 
name string) error {
+// waitForPodRunning waits for a given pod to reach the running state.
+// It may return the internal container to watch if present
+func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, 
name string) (string, error) {
        pod := v1.Pod{
                TypeMeta: metav1.TypeMeta{
                        Kind:       "Pod",
@@ -135,22 +144,22 @@ func (s *PodScraper) waitForPodRunning(ctx 
context.Context, namespace string, na
        }
        resourceClient, _, err := k8sclient.GetResourceClient(pod.APIVersion, 
pod.Kind, pod.Namespace)
        if err != nil {
-               return err
+               return "", err
        }
        watcher, err := resourceClient.Watch(metav1.ListOptions{
                FieldSelector: "metadata.name=" + pod.Name,
        })
        if err != nil {
-               return err
+               return "", err
        }
        events := watcher.ResultChan()
        for {
                select {
                case <-ctx.Done():
-                       return ctx.Err()
+                       return "", ctx.Err()
                case e, ok := <-events:
                        if !ok {
-                               return errors.New("event channel closed")
+                               return "", errors.New("event channel closed")
                        }
 
                        if e.Object != nil {
@@ -161,19 +170,34 @@ func (s *PodScraper) waitForPodRunning(ctx 
context.Context, namespace string, na
                                        pcopy := pod.DeepCopy()
                                        err := 
k8sutil.UnstructuredIntoRuntimeObject(&unstr, pcopy)
                                        if err != nil {
-                                               return err
+                                               return "", err
                                        }
 
                                        if pcopy.Status.Phase == v1.PodRunning {
-                                               return nil
+                                               return 
s.chooseContainer(pcopy), nil
                                        }
                                }
                        } else if e.Type == watch.Deleted || e.Type == 
watch.Error {
-                               return errors.New("unable to watch pod " + 
s.name)
+                               return "", errors.New("unable to watch pod " + 
s.name)
                        }
                case <-time.After(30 * time.Second):
-                       return errors.New("no state change after 30 seconds for 
pod " + s.name)
+                       return "", errors.New("no state change after 30 seconds 
for pod " + s.name)
                }
        }
 
 }
+
+func (s *PodScraper) chooseContainer(p *v1.Pod) string {
+       if p != nil {
+               if len(p.Spec.Containers) == 1 {
+                       // Let Kubernetes auto-detect
+                       return ""
+               }
+               for _, c := range p.Spec.Containers {
+                       if _, ok := commonUserContainerNames[c.Name]; ok {
+                               return c.Name
+                       }
+               }
+       }
+       return ""
+}

Reply via email to