Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/kar-controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ServerOption struct {
QuotaRestURL string
HealthProbeListenAddr string
DispatchResourceReservationTimeout int64
ExternalDispatch bool // if true, will use external plugin to dispatch workloads
}

// NewServerOption creates a new CMServer with a default config.
Expand Down Expand Up @@ -83,6 +84,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.")
fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'")
fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes")
fs.BoolVar(&s.ExternalDispatch,"externalDispatch", s.ExternalDispatch,"Use external workload dispatch plugin. Default is false.")

flag.Parse()
klog.V(4).Infof("[AddFlags] Controller configuration: %#v", s)
}
Expand Down Expand Up @@ -147,6 +150,12 @@ func (s *ServerOption) loadDefaultsFromEnvVars() {
s.DispatchResourceReservationTimeout = to
}
}
externalDispatch, envVarExists := os.LookupEnv("EXTERNAL_DISPATCH")
s.ExternalDispatch = false
if envVarExists && strings.EqualFold(externalDispatch, "true") {
s.ExternalDispatch = true
}

}

func (s *ServerOption) CheckOptionOrDie() {
Expand Down
37 changes: 29 additions & 8 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,24 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
}

func (qjm *XController) chooseAgent(qj *arbv1.AppWrapper) string {

if qjm.serverOption.ExternalDispatch {
clusterList := qj.Spec.SchedSpec.ClusterScheduling.Clusters
var clusterId = ""
// target clusters no defined by the submitter of workload. Just pick a target
// from a known list of clusters provided in serverOption.AgentConfigs
if len(clusterList) == 0 {
clusterId = qjm.agentList[rand.Int()%len(qjm.agentList)]
klog.V(1).Infof("ClusterId %s is chosen randomly from a list provided by mcad\n", clusterId)
} else {
// choose target clusterId at random
clusterId = clusterList[rand.Int()%len(clusterList)].Name
klog.V(1).Infof("ClusterId %s is chosen randomly from a list provided in Spec.SchedSpec.ClusterScheduling.Clusters: %s\n", clusterId, clusterList)
//qj.Status.TargetClusterName =
//qj.Status.TargetClusterName = clusterList[rand.Int()%len(clusterList)].Name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete at your convenience.

}
return clusterId;
}

qjAggrResources := qjm.GetAggregatedResources(qj)
klog.V(2).Infof("[chooseAgent] Aggregated Resources of XQJ %s: %v\n", qj.Name, qjAggrResources)
Expand Down Expand Up @@ -1922,7 +1940,6 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
defer func() {
klog.V(10).Infof("[worker-manageQJ] Ending %s manageQJ time=%s &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(startTime), qj, qj.ResourceVersion, qj.Status)
}()

if !cc.isDispatcher { // Agent Mode

if qj.DeletionTimestamp != nil {
Expand Down Expand Up @@ -2215,18 +2232,19 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
current_time := time.Now()
klog.V(10).Infof("[worker-manageQJ] XQJ %s has Overhead Before Dispatching: %s", qj.Name, current_time.Sub(qj.CreationTimestamp.Time))
klog.V(10).Infof("[TTime] %s, %s: WorkerBeforeDispatch", qj.Name, time.Now().Sub(qj.CreationTimestamp.Time))
}

}
queuejobKey, _ := GetQueueJobKey(qj)
// agentId:=cc.dispatchMap[queuejobKey]
// if agentId!=nil {
if agentId, ok := cc.dispatchMap[queuejobKey]; ok {
klog.V(10).Infof("[Dispatcher Controller] Dispatched AppWrapper %s to Agent ID: %s.", qj.Name, agentId)
cc.agentMap[agentId].CreateJob(qj)
if cc.serverOption.ExternalDispatch {
qj.Status.TargetClusterName = agentId
} else {
cc.agentMap[agentId].CreateJob(qj)
}
qj.Status.IsDispatched = true
} else {
klog.Errorf("[Dispatcher Controller] AppWrapper %s not found in dispatcher mapping.", qj.Name)
}
}
if klog.V(10).Enabled() {
current_time := time.Now()
klog.V(10).Infof("[Dispatcher Controller] XQJ %s has Overhead After Dispatching: %s", qj.Name, current_time.Sub(qj.CreationTimestamp.Time))
Expand Down Expand Up @@ -2277,7 +2295,10 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
if appwrapper.Status.IsDispatched {
queuejobKey, _ := GetQueueJobKey(appwrapper)
if obj, ok := cc.dispatchMap[queuejobKey]; ok {
cc.agentMap[obj].DeleteJob(appwrapper)
if !cc.serverOption.ExternalDispatch {
cc.agentMap[obj].DeleteJob(appwrapper)
}
delete(cc.dispatchMap,queuejobKey)
}
appwrapper.Status.IsDispatched = false
}
Expand Down