Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sonic-ctrmgrd/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ tests/__pycache__/
ctrmgr/__pycache__/
venv
tests/.coverage*
.pytest_cache/
25 changes: 20 additions & 5 deletions src/sonic-ctrmgrd/ctrmgr/container
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ STATE = "state"

KUBE_LABEL_TABLE = "KUBE_LABELS"
KUBE_LABEL_SET_KEY = "SET"
SERVER_TABLE = "KUBERNETES_MASTER"
SERVER_KEY = "SERVER"
ST_SER_CONNECTED = "connected"
ST_SER_UPDATE_TS = "update_time"

# Get seconds to wait for remote docker to start.
# If not, revert to local
Expand Down Expand Up @@ -75,8 +79,10 @@ def read_data(is_config, feature, fields):
ret = []

db = cfg_db if is_config else state_db

tbl = swsscommon.Table(db, FEATURE_TABLE)
if feature == SERVER_KEY:
tbl = swsscommon.Table(db, SERVER_TABLE)
else:
tbl = swsscommon.Table(db, FEATURE_TABLE)

data = dict(tbl.get(feature)[1])
for (field, default) in fields:
Expand Down Expand Up @@ -104,6 +110,13 @@ def read_state(feature):
[(CURRENT_OWNER, "none"), (REMOTE_STATE, "none"), (CONTAINER_ID, "")])


def read_server_state():
""" Read requried feature state """

return read_data(False, SERVER_KEY,
[(ST_SER_CONNECTED, "false"), (ST_SER_UPDATE_TS, "")])


def docker_action(action, feature, **kwargs):
""" Execute docker action """
try:
Expand Down Expand Up @@ -192,9 +205,10 @@ def container_start(feature, **kwargs):

set_owner, fallback, _ = read_config(feature)
_, remote_state, _ = read_state(feature)
server_connected, _ = read_server_state()

debug_msg("{}: set_owner:{} fallback:{} remote_state:{}".format(
feature, set_owner, fallback, remote_state))
debug_msg("{}: set_owner:{} fallback:{} remote_state:{} server_connected:{}".format(
feature, set_owner, fallback, remote_state, server_connected))

data = {
SYSTEM_STATE: "up",
Expand All @@ -207,8 +221,9 @@ def container_start(feature, **kwargs):
start_val = START_LOCAL
else:
start_val = START_KUBE
if fallback and (remote_state == "none"):
if fallback and (remote_state == "none" or server_connected == "false"):
start_val |= START_LOCAL
data[REMOTE_STATE] = "none"

if start_val == START_LOCAL:
# Implies *only* local.
Expand Down
16 changes: 8 additions & 8 deletions src/sonic-ctrmgrd/ctrmgr/container_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,14 @@ def container_up(feature, owner, version):
do_freeze(feature, "This version is marked disabled. Exiting ...")
return

if not instance_higher(feature, state_data[VERSION], version):
# TODO: May Remove label <feature_name>_<version>_enabled
# Else kubelet will continue to re-deploy every 5 mins, until
# master removes the lable to un-deploy.
#
do_freeze(feature, "bail out as current deploy version {} is not higher".
format(version))
return
# if not instance_higher(feature, state_data[VERSION], version):
Copy link
Collaborator

@qiluo-msft qiluo-msft Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not instance_higher(feature, state_data[VERSION], version):

Do not just comment code, remove them if they are dead. #Closed

# # TODO: May Remove label <feature_name>_<version>_enabled
# # Else kubelet will continue to re-deploy every 5 mins, until
# # master removes the lable to un-deploy.
# #
# do_freeze(feature, "bail out as current deploy version {} is not higher".
# format(version))
# return

update_data(state_db, feature, { VERSION: version })

Expand Down
75 changes: 70 additions & 5 deletions src/sonic-ctrmgrd/ctrmgr/ctrmgrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
CFG_SER_IP: "",
CFG_SER_PORT: "6443",
CFG_SER_DISABLE: "false",
CFG_SER_INSECURE: "false"
CFG_SER_INSECURE: "true"
}

dflt_st_ser = {
Expand Down Expand Up @@ -88,18 +88,20 @@
JOIN_LATENCY = "join_latency_on_boot_seconds"
JOIN_RETRY = "retry_join_interval_seconds"
LABEL_RETRY = "retry_labels_update_seconds"
TAG_IMAGE_LATEST = "tag_latest_image_on_wait_seconds"
USE_K8S_PROXY = "use_k8s_as_http_proxy"

remote_ctr_config = {
JOIN_LATENCY: 10,
JOIN_RETRY: 10,
LABEL_RETRY: 2,
TAG_IMAGE_LATEST: 30,
USE_K8S_PROXY: ""
}

def log_debug(m):
msg = "{}: {}".format(inspect.stack()[1][3], m)
print(msg)
#print(msg)
Copy link
Collaborator

@qiluo-msft qiluo-msft Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#print(msg)

Remove the debug code. #Closed

syslog.syslog(syslog.LOG_DEBUG, msg)


Expand Down Expand Up @@ -148,6 +150,8 @@ def init():
with open(SONIC_CTR_CONFIG, "r") as s:
d = json.load(s)
remote_ctr_config.update(d)
if UNIT_TESTING:
remote_ctr_config[TAG_IMAGE_LATEST] = 0


class MainServer:
Expand All @@ -172,11 +176,11 @@ def register_db(self, db_name):
self.db_connectors[db_name] = swsscommon.DBConnector(db_name, 0)


def register_timer(self, ts, handler):
def register_timer(self, ts, handler, args=()):
Copy link
Collaborator

@qiluo-msft qiluo-msft Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args=()

Even it is allowed to use empty tuple (which is immutable) as default value, high suggest using None as default value. #Closed

""" Register timer based handler.
The handler will be called on/after give timestamp, ts
"""
self.timer_handlers[ts].append(handler)
self.timer_handlers[ts].append((handler, args))


def register_handler(self, db_name, table_name, handler):
Expand Down Expand Up @@ -235,7 +239,7 @@ def run(self):
lst = self.timer_handlers[k]
del self.timer_handlers[k]
for fn in lst:
fn()
fn[0](*fn[1])
else:
timeout = (k - ct_ts).seconds
break
Expand Down Expand Up @@ -426,6 +430,54 @@ def do_join(self, ip, port, insecure):
format(remote_ctr_config[JOIN_RETRY], self.start_time))


def tag_latest_image(server, feat, docker_id, image_ver):
res = 1
if not UNIT_TESTING:
status = os.system("docker ps |grep {} >/dev/null".format(docker_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure the running time of the container >= expected min time.
By the time, the timer fires, potentially it could have restarted.

Said that, systemd would have failed for frequent restarts.

Think through for any possible hole.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I set "tag_latest_image_on_wait_seconds" = 600s, it means when a k8s pod running for over 10 minutes(by monitor the pod's container id), I will tag the container's image. About this default value, do you have any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be not. Suggest

  1. Try restarting service during this period
  2. Kill the main process in the container that will make the container exit within this period
  3. Any more .... ;)

Copy link
Contributor Author

@lixiaoyuner lixiaoyuner Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The situation you say should be:

  • k8s schedules container and remote_state is running
  • ctrmgrd capture remote_state ready -> running during last process, register timer_handler(600s, tag_function(previous_docker_id))
  • during the 600s, systemctl restart feature

The current reactions are:

  • feature systemd service will stop -> start -> wait, when stop, set remote_state=stopped, when start, not docker start, when wait, wait docker_id
  • k8s finds its scheduled container is killed, try to reschedule, container_up set remote_state=pending
  • ctrmgrd captures remote_state=pending, set remote_state=ready
  • container_up set remote_state=running and set docker_id
  • ctrmgrd capture remote_state ready -> running, will register timer_handler(600s, tag_function(current_docker_id))
  • and tag_latest_image_on_wait_seconds is up, tag function finds the previous_docker_id is disappeared, will do nothing
  • and tag_latest_image_on_wait_seconds is up, tag function finds the current_docker_id, tag latest.

Have tested this case and some other cases, all good.
Any other corner cases you want me to test?

And by the way, about removing 'tag as local' from reboot script, you want me to do it in the PR or in the future? I want to do in the future, because it seems redundant, but no real impacts. And it can ensure tag as local when reboot, maybe necessary for us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we do it in this PR, as this PR's major purpose is to tag local.

One more Q: After you tag as "local" when you do service restart, who wins, systemd or kube.

I prefer systemd takes over. This would add the new version in label that should auto-block kube from deploying.
Can you check this out too?

Copy link
Contributor Author

@lixiaoyuner lixiaoyuner Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tag as local is in reboot script in sonic-utilities repo, removing from reboot script, PR is here. #Remove tag as local

Q: After you tag as "local" when do service restart, who wins?
Kube wins.
I see label such as feature_version_enabled=false will be set only when containers start up as local. Actually, when we restart the service, systemd stop will set remote_state=stopped so that systemd will wait k8s to reschedule. K8s finds its container is down, will reschedule immediately. In this process, container has no chance to start up as local to set feature_version_enabled=false. So, there is no auto-block.
About this problem, I think it's right that service is taken over by kube in this situation, because the set-owner is kube and k8s is connected. Taken over by kube should be the proper action.

By the way. I want to remove the instance_higher checker, because we want to support fallback in the future, if post-check failed, we could fall back to the previous version, I think the version should be managed by scheduler. What's your concern about this before, is there any necessary things so that we always need the higher version? #remove in this commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two points:

  1. Please merge the reboot fix PR into this PR -- Like to keep all related in one PR.
    For example, if we revert one, it should affect both. it should both either stay or go as one piece.
  2. I agree with your explanation. Earlier tagging upon reboot only did not need any change, as state-db starts empty. Now that we are preponing the tagging, we need to do required updates that systemd takes over, if kube image is tagged as latest. In fact, if it makes simple, we can change from "kube running" to "kube-transient" once you tag it latest. On next systemd restart, seeing it kube-transient" we can go local path.

No need for kube to manage after local tagging. It is more stable if systemd takes over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point 1: About the two PRs, the reason is that the reboot script is in another repo called sonic-utilities, seems we can't raise a PR including two repos code.

Point 2: About after tagging, it's easy to let feature go back to local after restart, but one of our visions is that k8s can collect the containers real-time metrics, if features are taken over by systemd after tagging, we can't keep collecting the metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point 1 -- Agree.
Point 2 -- Debatable. What happens if device gets rebooted ? We have STATE-DB for counters/metrics.

Let it be so for now.

if status:
syslog.syslog(syslog.LOG_ERR,
"Feature {}:{} is not stable".format(feat, image_ver))
else:
image_item = os.popen("docker inspect {} |jq -r .[].Image".format(docker_id)).read().strip()
if image_item:
image_id = image_item.split(":")[1][:12]
image_info = os.popen("docker images |grep {}".format(image_id)).read().split()
if image_info:
image_rep = image_info[0]
res = os.system("docker tag {} {}:latest".format(image_id, image_rep))
if res != 0:
syslog.syslog(syslog.LOG_ERR,
"Failed to tag {}:{} to latest".format(image_rep, image_ver))
else:
syslog.syslog(syslog.LOG_INFO,
"Successfully tag {}:{} to latest".format(image_rep, image_ver))
feat_status = os.popen("docker inspect {} |jq -r .[].State.Running".format(feat)).read().strip()
if feat_status:
if feat_status == 'true':
os.system("docker stop {}".format(feat))
syslog.syslog(syslog.LOG_ERR,
"{} should not run, stop it".format(feat))
os.system("docker rm {}".format(feat))
syslog.syslog(syslog.LOG_INFO,
"Delete previous {} container".format(feat))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to docker images |grep {} to get image repo".format(image_id))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to inspect container:{} to get image id".format(docker_id))
else:
server.mod_db_entry(STATE_DB_NAME,
FEATURE_TABLE, feat, {"tag_latest": "true"})
res = 0
if res:
log_debug("failed to tag {}:{} to latest".format(feat, image_ver))
else:
log_debug("successfully tag {}:{} to latest".format(feat, image_ver))

return res


#
# Feature changes
#
Expand Down Expand Up @@ -523,6 +575,19 @@ def on_state_update(self, key, op, data):
self.st_data[key] = _update_entry(dflt_st_feat, data)
remote_state = self.st_data[key][ST_FEAT_REMOTE_STATE]

if (old_remote_state != remote_state) and (remote_state == "running"):
# Tag latest
start_time = datetime.datetime.now() + datetime.timedelta(
seconds=remote_ctr_config[TAG_IMAGE_LATEST])
self.server.register_timer(start_time, tag_latest_image, (
self.server,
key,
self.st_data[key][ST_FEAT_CTR_ID],
self.st_data[key][ST_FEAT_CTR_VER]))

log_debug("try to tag latest label after {} seconds @{}".format(
remote_ctr_config[TAG_IMAGE_LATEST], start_time))

if (not init) and (
(old_remote_state == remote_state) or (remote_state != "pending")):
# no change or nothing to do.
Expand Down
8 changes: 4 additions & 4 deletions src/sonic-ctrmgrd/ctrmgr/kube_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _run_command(cmd, timeout=5):

def kube_read_labels():
""" Read current labels on node and return as dict. """
KUBECTL_GET_CMD = "kubectl --kubeconfig {} get nodes {} --show-labels |tr -s ' ' | cut -f6 -d' '"
KUBECTL_GET_CMD = "kubectl --kubeconfig {} get nodes {} --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '"

labels = {}
ret, out, _ = _run_command(KUBECTL_GET_CMD.format(
Expand Down Expand Up @@ -332,12 +332,12 @@ def _do_reset(pending_join = False):


def _do_join(server, port, insecure):
KUBEADM_JOIN_CMD = "kubeadm join --discovery-file {} --node-name {} --apiserver-advertise-address {}"
KUBEADM_JOIN_CMD = "kubeadm join --discovery-file {} --node-name {}"
err = ""
out = ""
ret = 0
try:
local_ipv6 = _get_local_ipv6()
#local_ipv6 = _get_local_ipv6()
#_download_file(server, port, insecure)
Copy link
Collaborator

@qiluo-msft qiluo-msft Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete dead code. #Closed

_gen_cli_kubeconf(server, port, insecure)
_do_reset(True)
Expand All @@ -349,7 +349,7 @@ def _do_join(server, port, insecure):

if ret == 0:
(ret, out, err) = _run_command(KUBEADM_JOIN_CMD.format(
KUBE_ADMIN_CONF, get_device_name(), local_ipv6), timeout=60)
KUBE_ADMIN_CONF, get_device_name()), timeout=60)
log_debug("ret = {}".format(ret))

except IOError as e:
Expand Down
1 change: 1 addition & 0 deletions src/sonic-ctrmgrd/ctrmgr/remote_ctr.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"retry_join_interval_seconds": 30,
"retry_labels_update_seconds": 5,
"revert_to_local_on_wait_seconds": 60,
"tag_latest_image_on_wait_seconds": 600,
"use_k8s_as_http_proxy": "n"
}

2 changes: 1 addition & 1 deletion src/sonic-ctrmgrd/tests/container_startup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
common_test.FEATURE_TABLE: {
"snmp": {
"container_id": "no_change",
"container_version": "20201230.77",
"container_version": "20201230.11",
"current_owner": "no_change",
"remote_state": "no_change",
"system_state": "up"
Expand Down
5 changes: 5 additions & 0 deletions src/sonic-ctrmgrd/tests/container_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@
"current_owner": "none",
"container_id": ""
}
},
common_test.SERVER_TABLE: {
"SERVER": {
"connected": "true"
}
}
}
},
Expand Down
49 changes: 47 additions & 2 deletions src/sonic-ctrmgrd/tests/ctrmgrd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
common_test.KUBE_JOIN: {
"ip": "10.10.10.10",
"port": "6443",
"insecure": "false"
"insecure": "true"
}
}
},
Expand Down Expand Up @@ -151,7 +151,7 @@
common_test.KUBE_JOIN: {
"ip": "10.10.10.10",
"port": "6443",
"insecure": "false"
"insecure": "true"
},
common_test.KUBE_RESET: {
"flag": "true"
Expand Down Expand Up @@ -276,6 +276,51 @@
}
}
}
},
3: {
common_test.DESCR: "Tag image latest when remote_state changes to running",
common_test.ARGS: "ctrmgrd",
common_test.PRE: {
common_test.CONFIG_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"set_owner": "kube"
}
}
},
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "pending"
}
}
}
},
common_test.UPD: {
common_test.CONFIG_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"set_owner": "kube"
}
}
},
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "running"
}
}
}
},
common_test.POST: {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"tag_latest": "true"
}
}
}
}
}
}

Expand Down
Loading