2222import pprint
2323import warnings
2424import datetime
25+ import json
2526
2627import iptools
2728
@@ -63,6 +64,11 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True,
6364 group = self .ec2 .get_security_group (clname )
6465 cl = Cluster (ec2_conn = self .ec2 , cluster_tag = cltag ,
6566 cluster_group = group )
67+
68+ # Useful when config is on master node
69+ cl .key_location = \
70+ self .cfg .get_key (cl .master_node .key_name ).get ('key_location' )
71+
6672 if load_receipt :
6773 cl .load_receipt (load_plugins = load_plugins ,
6874 load_volumes = load_volumes )
@@ -420,6 +426,7 @@ def __init__(self,
420426 subnet_id = None ,
421427 public_ips = None ,
422428 plugins_order = [],
429+ config_on_master = False ,
423430 ** kwargs ):
424431 # update class vars with given vars
425432 _vars = locals ().copy ()
@@ -455,7 +462,6 @@ def __init__(self,
455462 self ._nodes = []
456463 self ._pool = None
457464 self ._progress_bar = None
458- self ._config_fields = None
459465 self .__default_plugin = None
460466 self .__sge_plugin = None
461467
@@ -588,10 +594,12 @@ def __str__(self):
588594 return pprint .pformat (cfg )
589595
590596 def print_config (self ):
591- config = {}
592- for key in self ._config_fields :
593- config [key ] = getattr (self , key )
594- pprint .pprint (config )
597+ core_settings , user_settings = self ._get_settings ()
598+ print "Core settings"
599+ print json .dumps (core_settings , indent = 1 , sort_keys = True )
600+ print
601+ print "User settings"
602+ print json .dumps (user_settings , indent = 1 , sort_keys = True )
595603
596604 def load_receipt (self , load_plugins = True , load_volumes = True ):
597605 """
@@ -608,8 +616,9 @@ def load_receipt(self, load_plugins=True, load_volumes=True):
608616 msg = user_msgs .version_mismatch % d
609617 sep = '*' * 60
610618 log .warn ('\n ' .join ([sep , msg , sep ]), extra = {'__textwrap__' : 1 })
611- self ._config_fields = self ._get_settings_from_tags ()
612- self .update (self ._config_fields )
619+ self .update (self ._get_settings_from_tags ())
620+ if self .config_on_master :
621+ self ._load_config_from_master ()
613622 if not (load_plugins or load_volumes ):
614623 return True
615624 try :
@@ -656,26 +665,6 @@ def __getstate__(self):
656665 def _security_group (self ):
657666 return static .SECURITY_GROUP_TEMPLATE % self .cluster_tag
658667
659- def save_core_settings (self , sg ):
660- core_settings = utils .dump_compress_encode (
661- dict (cluster_size = self .cluster_size ,
662- master_image_id = self .master_image_id ,
663- master_instance_type = self .master_instance_type ,
664- node_image_id = self .node_image_id ,
665- node_instance_type = self .node_instance_type ,
666- disable_queue = self .disable_queue ,
667- disable_cloudinit = self .disable_cloudinit ,
668- plugins_order = self .plugins_order ),
669- use_json = True )
670- sg .add_tag (static .CORE_TAG , core_settings )
671-
672- def save_user_settings (self , sg ):
673- user_settings = utils .dump_compress_encode (
674- dict (cluster_user = self .cluster_user ,
675- cluster_shell = self .cluster_shell , keyname = self .keyname ,
676- spot_bid = self .spot_bid ), use_json = True )
677- sg .add_tag (static .USER_TAG , user_settings )
678-
679668 @property
680669 def subnet (self ):
681670 if not self ._subnet and self .subnet_id :
@@ -730,9 +719,10 @@ def _add_chunked_tags(self, sg, chunks, base_tag_name):
730719 if tag not in sg .tags :
731720 sg .add_tag (tag , chunk )
732721
733- def _add_tags_to_sg (self , sg ):
734- if static .VERSION_TAG not in sg .tags :
735- sg .add_tag (static .VERSION_TAG , str (static .VERSION ))
722+ def _get_settings (self ):
723+ """
724+ The settings to save
725+ """
736726 core_settings = dict (cluster_size = self .cluster_size ,
737727 master_image_id = self .master_image_id ,
738728 master_instance_type = self .master_instance_type ,
@@ -743,16 +733,30 @@ def _add_tags_to_sg(self, sg):
743733 subnet_id = self .subnet_id ,
744734 public_ips = self .public_ips ,
745735 disable_queue = self .disable_queue ,
746- disable_cloudinit = self .disable_cloudinit )
736+ disable_cloudinit = self .disable_cloudinit ,
737+ plugins_order = self .plugins_order )
747738 user_settings = dict (cluster_user = self .cluster_user ,
748739 cluster_shell = self .cluster_shell ,
749740 keyname = self .keyname , spot_bid = self .spot_bid )
750- core = utils .dump_compress_encode (core_settings , use_json = True ,
751- chunk_size = static .MAX_TAG_LEN )
752- self ._add_chunked_tags (sg , core , static .CORE_TAG )
753- user = utils .dump_compress_encode (user_settings , use_json = True ,
754- chunk_size = static .MAX_TAG_LEN )
755- self ._add_chunked_tags (sg , user , static .USER_TAG )
741+ return core_settings , user_settings
742+
743+ def _add_tags_to_sg (self , sg ):
744+ if static .VERSION_TAG not in sg .tags :
745+ sg .add_tag (static .VERSION_TAG , str (static .VERSION ))
746+ if self .config_on_master :
747+ # the only info we store is the fact that config is on master
748+ core = utils .dump_compress_encode (
749+ dict (config_on_master = self .config_on_master ),
750+ use_json = True , chunk_size = static .MAX_TAG_LEN )
751+ self ._add_chunked_tags (sg , core , static .CORE_TAG )
752+ else :
753+ core_settings , user_settings = self ._get_settings ()
754+ core = utils .dump_compress_encode (core_settings , use_json = True ,
755+ chunk_size = static .MAX_TAG_LEN )
756+ self ._add_chunked_tags (sg , core , static .CORE_TAG )
757+ user = utils .dump_compress_encode (user_settings , use_json = True ,
758+ chunk_size = static .MAX_TAG_LEN )
759+ self ._add_chunked_tags (sg , user , static .USER_TAG )
756760
757761 def _load_chunked_tags (self , sg , base_tag_name ):
758762 tags = [i for i in sg .tags if i .startswith (base_tag_name )]
@@ -769,6 +773,34 @@ def _get_settings_from_tags(self, sg=None):
769773 cluster .update (self ._load_chunked_tags (sg , static .USER_TAG ))
770774 return cluster
771775
776+ def save_config_on_master (self ):
777+ """
778+ Vanilla Improvements function - save the config on the master node.
779+ For cluster saving their config on the master node rather than in
780+ the security group tags. No more chunk/hashing/splitting headaches.
781+ """
782+ settings , user_settings = self ._get_settings ()
783+ settings .update (user_settings )
784+ settings ["plugins" ] = self ._plugins
785+ config = self .master_node .ssh .remote_file (static .MASTER_CFG_FILE , 'wt' )
786+ json .dump (settings , config , indent = 4 , separators = (',' , ': ' ),
787+ sort_keys = True )
788+ config .close ()
789+
790+ def _load_config_from_master (self ):
791+ """
792+ Vanilla Improvements function - loads the config on the master node.
793+ """
794+ config = self .master_node .ssh .remote_file (static .MASTER_CFG_FILE , 'rt' )
795+ loaded_config = json .load (config )
796+ self .plugins_order = loaded_config ["plugins" ]
797+ self .update (loaded_config )
798+ config .close ()
799+ master = self .master_node
800+ self .plugins = self .load_plugins (
801+ master .get_plugins (self .plugins_order ))
802+ self .validate ()
803+
772804 @property
773805 def placement_group (self ):
774806 if self ._placement_group is None :
0 commit comments