@@ -17,8 +17,11 @@ limitations under the License.
1717package v1beta1
1818
1919import (
20+ "bytes"
21+ "context"
2022 "encoding/json"
2123 "errors"
24+ "io"
2225 "log"
2326 "net/http"
2427 "path/filepath"
@@ -29,10 +32,19 @@ import (
2932
3033 experimentv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1"
3134 suggestionv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1"
35+ trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
3236 api_pb_v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1"
3337 consts "github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
3438 "github.com/kubeflow/katib/pkg/util/v1beta1/katibclient"
3539 corev1 "k8s.io/api/core/v1"
40+
41+ common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
42+ mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common"
43+ apiv1 "k8s.io/api/core/v1"
44+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
45+ "k8s.io/apimachinery/pkg/types"
46+ "k8s.io/client-go/kubernetes"
47+ "sigs.k8s.io/controller-runtime/pkg/client/config"
3648)
3749
3850func NewKatibUIHandler (dbManagerAddr string ) * KatibUIHandler {
@@ -574,3 +586,149 @@ func (k *KatibUIHandler) FetchTrial(w http.ResponseWriter, r *http.Request) {
574586 return
575587 }
576588}
589+
590+ // FetchTrialLogs fetches logs for a trial in specific namespace.
591+ func (k * KatibUIHandler ) FetchTrialLogs (w http.ResponseWriter , r * http.Request ) {
592+ namespaces , ok := r .URL .Query ()["namespace" ]
593+ if ! ok {
594+ log .Printf ("No namespace provided in Query parameters! Provide a 'namespace' param" )
595+ err := errors .New ("no 'namespace' provided" )
596+ http .Error (w , err .Error (), http .StatusBadRequest )
597+ return
598+ }
599+
600+ trialNames , ok := r .URL .Query ()["trialName" ]
601+ if ! ok {
602+ log .Printf ("No trialName provided in Query parameters! Provide a 'trialName' param" )
603+ err := errors .New ("no 'trialName' provided" )
604+ http .Error (w , err .Error (), http .StatusBadRequest )
605+ return
606+ }
607+
608+ trialName := trialNames [0 ]
609+ namespace := namespaces [0 ]
610+
611+ user , err := IsAuthorized (consts .ActionTypeGet , namespace , consts .PluralTrial , "" , trialName , trialsv1beta1 .SchemeGroupVersion , k .katibClient .GetClient (), r )
612+ if user == "" && err != nil {
613+ log .Printf ("No user provided in kubeflow-userid header." )
614+ http .Error (w , err .Error (), http .StatusUnauthorized )
615+ return
616+ } else if err != nil {
617+ log .Printf ("The user: %s is not authorized to get trial: %s in namespace: %s \n " , user , trialName , namespace )
618+ http .Error (w , err .Error (), http .StatusForbidden )
619+ return
620+ }
621+
622+ trial := & trialsv1beta1.Trial {}
623+ if err := k .katibClient .GetClient ().Get (context .Background (), types.NamespacedName {Name : trialName , Namespace : namespace }, trial ); err != nil {
624+ log .Printf ("GetLogs failed: %v" , err )
625+ http .Error (w , err .Error (), http .StatusInternalServerError )
626+ return
627+ }
628+
629+ // TODO: Use controller-runtime client instead of kubernetes client to get logs, once this is available
630+ clientset , err := createKubernetesClientset ()
631+ if err != nil {
632+ log .Printf ("GetLogs failed: %v" , err )
633+ http .Error (w , err .Error (), http .StatusInternalServerError )
634+ return
635+ }
636+
637+ podName , err := fetchMasterPodName (clientset , trial )
638+ if err != nil {
639+ log .Printf ("GetLogs failed: %v" , err )
640+ http .Error (w , err .Error (), http .StatusInternalServerError )
641+ return
642+ }
643+
644+ user , err = IsAuthorized (consts .ActionTypeGet , namespace , corev1 .ResourcePods .String (), "log" , podName , corev1 .SchemeGroupVersion , k .katibClient .GetClient (), r )
645+ if user == "" && err != nil {
646+ log .Printf ("No user provided in kubeflow-userid header." )
647+ http .Error (w , err .Error (), http .StatusUnauthorized )
648+ return
649+ } else if err != nil {
650+ log .Printf ("The user: %s is not authorized to get pod logs: %s in namespace: %s \n " , user , podName , namespace )
651+ http .Error (w , err .Error (), http .StatusForbidden )
652+ return
653+ }
654+
655+ podLogOpts := apiv1.PodLogOptions {}
656+ podLogOpts .Container = trial .Spec .PrimaryContainerName
657+ if trial .Spec .MetricsCollector .Collector .Kind == common .StdOutCollector {
658+ podLogOpts .Container = mccommon .MetricLoggerCollectorContainerName
659+ }
660+
661+ logs , err := fetchPodLogs (clientset , namespace , podName , podLogOpts )
662+ if err != nil {
663+ log .Printf ("GetLogs failed: %v" , err )
664+ http .Error (w , err .Error (), http .StatusInternalServerError )
665+ return
666+ }
667+ response , err := json .Marshal (logs )
668+ if err != nil {
669+ log .Printf ("Marshal logs failed: %v" , err )
670+ http .Error (w , err .Error (), http .StatusInternalServerError )
671+ return
672+ }
673+ if _ , err = w .Write (response ); err != nil {
674+ log .Printf ("Write logs failed: %v" , err )
675+ http .Error (w , err .Error (), http .StatusInternalServerError )
676+ return
677+ }
678+ }
679+
680+ // createKubernetesClientset returns kubernetes clientset
681+ func createKubernetesClientset () (* kubernetes.Clientset , error ) {
682+ cfg , err := config .GetConfig ()
683+ if err != nil {
684+ return nil , err
685+ }
686+ clientset , err := kubernetes .NewForConfig (cfg )
687+ if err != nil {
688+ return nil , err
689+ }
690+ return clientset , nil
691+ }
692+
693+ // fetchMasterPodName returns name of the master pod for a trial
694+ func fetchMasterPodName (clientset * kubernetes.Clientset , trial * trialsv1beta1.Trial ) (string , error ) {
695+ selectionLabel := consts .LabelTrialName + "=" + trial .ObjectMeta .Name
696+ for primaryKey , primaryValue := range trial .Spec .PrimaryPodLabels {
697+ selectionLabel = selectionLabel + "," + primaryKey + "=" + primaryValue
698+ }
699+
700+ podList , err := clientset .CoreV1 ().Pods (trial .ObjectMeta .Namespace ).List (context .Background (), metav1.ListOptions {LabelSelector : selectionLabel })
701+ if err != nil {
702+ return "" , err
703+ }
704+
705+ if len (podList .Items ) == 0 {
706+ return "" , errors .New (`Logs for the trial could not be found.
707+ Was 'retain: true' specified in the Experiment definition?
708+ An example can be found here: https://github.com/kubeflow/katib/blob/7bf39225f7235ee4ba6cf285ecc2c455c6471234/examples/v1beta1/argo/argo-workflow.yaml#L33` )
709+ }
710+ if len (podList .Items ) > 1 {
711+ return "" , errors .New ("More than one master replica found" )
712+ }
713+
714+ return podList .Items [0 ].Name , nil
715+ }
716+
717+ // fetchPodLogs returns logs of a pod for the given job name and namespace
718+ func fetchPodLogs (clientset * kubernetes.Clientset , namespace string , podName string , podLogOpts apiv1.PodLogOptions ) (string , error ) {
719+ req := clientset .CoreV1 ().Pods (namespace ).GetLogs (podName , & podLogOpts )
720+ podLogs , err := req .Stream (context .Background ())
721+ if err != nil {
722+ return "" , err
723+ }
724+ defer podLogs .Close ()
725+
726+ buf := new (bytes.Buffer )
727+ _ , err = io .Copy (buf , podLogs )
728+ if err != nil {
729+ return "" , err
730+ }
731+ str := buf .String ()
732+
733+ return str , nil
734+ }
0 commit comments