Skip to content

Commit 89f83b4

Browse files
committed
no bug code
1 parent c60aeca commit 89f83b4

8 files changed

Lines changed: 152 additions & 100 deletions

File tree

scripts/deploy_single_node/node_config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ nodes:
33
addr: 127.0.0.1:2600
44
spec: [meta,master]
55
2:
6-
addr: 127.0.0.1:2605
6+
addr: 192.168.31.240:2602
77
spec: [meta,worker]

src/main/src/general/app/mod.rs

Lines changed: 70 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -781,36 +781,36 @@ impl AppMetaManager {
781781
// let appdir = self.fs_layer.concat_app_dir(app);
782782
let appmeta = self.fs_layer.read_app_meta(tmpapp).await?;
783783

784-
// // TODO: 2.check project dir
785-
// // 3. if java, take snapshot
786-
// if let AppType::Jar = appmeta.app_type {
787-
// let _ = self
788-
// .meta
789-
// .write()
790-
// .await
791-
// .tmp_app_metas
792-
// .insert(tmpapp.to_owned(), appmeta.clone());
793-
// tracing::debug!("record app meta to make checkpoint {}", tmpapp);
794-
// self.view
795-
// .instance_manager()
796-
// .make_checkpoint_for_app(tmpapp)
797-
// .await?;
798-
// self.view
799-
// .instance_manager()
800-
// .drap_app_instances(tmpapp)
801-
// .await;
802-
// // remove app_meta
803-
// tracing::debug!("checkpoint made, remove app meta {}", tmpapp);
804-
// let _ = self
805-
// .meta
806-
// .write()
807-
// .await
808-
// .tmp_app_metas
809-
// .remove(tmpapp)
810-
// .unwrap_or_else(|| {
811-
// panic!("remove app meta failed, app: {}", tmpapp);
812-
// });
813-
// }
784+
// TODO: 2.check project dir
785+
// 3. if java, take snapshot
786+
if let AppType::Jar = appmeta.app_type {
787+
let _ = self
788+
.meta
789+
.write()
790+
.await
791+
.tmp_app_metas
792+
.insert(tmpapp.to_owned(), appmeta.clone());
793+
tracing::debug!("record app meta to make checkpoint {}", tmpapp);
794+
self.view
795+
.instance_manager()
796+
.make_checkpoint_for_app(tmpapp)
797+
.await?;
798+
self.view
799+
.instance_manager()
800+
.drap_app_instances(tmpapp)
801+
.await;
802+
// remove app_meta
803+
tracing::debug!("checkpoint made, remove app meta {}", tmpapp);
804+
let _ = self
805+
.meta
806+
.write()
807+
.await
808+
.tmp_app_metas
809+
.remove(tmpapp)
810+
.unwrap_or_else(|| {
811+
panic!("remove app meta failed, app: {}", tmpapp);
812+
});
813+
}
814814

815815
Ok(appmeta)
816816
}
@@ -1072,29 +1072,48 @@ impl AppMetaManager {
10721072

10731073
pub fn set_app_meta_list(&self, list: Vec<String>) {
10741074
//发送逻辑处理 曾俊
1075-
self.view
1076-
.kv_store_engine()
1077-
.set(
1078-
KeyTypeServiceList,
1079-
&serde_json::to_string(&list).unwrap().into(),
1080-
false,
1081-
)
1082-
.todo_handle("This part of the code needs to be implemented.");
1083-
}
1084-
pub fn get_app_meta_list(&self) -> Vec<String> {
1085-
let res = self
1086-
.view
1075+
// self.view
1076+
// .kv_store_engine()
1077+
// .set(
1078+
// KeyTypeServiceList,
1079+
// &serde_json::to_string(&list).unwrap().into(),
1080+
// false,
1081+
// )
1082+
// .todo_handle("This part of the code needs to be implemented.");
1083+
1084+
//修改后代码:对set函数的返回类型进行处理 曾俊
1085+
match self.view
10871086
.kv_store_engine()
1088-
.get(&KeyTypeServiceList, false, KvAdditionalConf {})
1089-
.map(|(_version, list)| list)
1090-
.unwrap_or_else(|| {
1091-
return vec![];
1092-
});
1093-
serde_json::from_slice(&res).unwrap_or_else(|e| {
1094-
tracing::warn!("parse app meta list failed, err: {:?}", e);
1095-
vec![]
1096-
})
1087+
.set(
1088+
KeyTypeServiceList,
1089+
&serde_json::to_string(&list).unwrap().into(),
1090+
false,
1091+
) {
1092+
Ok((version, _)) => {
1093+
tracing::debug!("App meta list updated successfully, version: {}, list: {:?}", version, list);
1094+
},
1095+
Err(e) => {
1096+
tracing::error!("Failed to set app meta list: {:?}", e);
1097+
}
10971098
}
1099+
}
1100+
1101+
pub fn get_app_meta_list(&self) -> Vec<String> {
1102+
let res = self
1103+
.view
1104+
.kv_store_engine()
1105+
.get(&KeyTypeServiceList, false, KvAdditionalConf {})
1106+
.map(|(_version, list)| list)
1107+
.unwrap_or_else(|| {
1108+
return vec![];
1109+
});
1110+
serde_json::from_slice(&res).unwrap_or_else(|e| {
1111+
tracing::warn!("parse app meta list failed, err: {:?}", e);
1112+
vec![]
1113+
})
1114+
}
1115+
1116+
10981117

10991118
// pub fn get_app_meta_basicinfo_list(&self) -> Vec<ServiceBasic> {
11001119
// let apps = self.get_app_meta_list();

src/main/src/general/data/m_data_general/mod.rs

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -660,25 +660,30 @@ impl DataGeneral {
660660
let message = "New data version overwrite".to_owned();
661661
tracing::warn!("{}", message);
662662

663-
responsor //返回结果未处理 曾俊
663+
if let Err(e) = responsor //返回结果未处理 曾俊
664664
.send_resp(WriteOneDataResponse {
665665
remote_version: 0,
666666
success: false,
667667
message,
668668
})
669-
.await
670-
.todo_handle("1 err_comment waitting to fill");
669+
.await{
670+
tracing::error!("Failed to send write one data response 1: {}", e);
671+
}
672+
// .todo_handle("1 err_comment waitting to fill");
673+
671674
};
672675
let fail_with_msg = |message: String| async {
673676
tracing::warn!("{}", message);
674-
responsor //返回结果未处理 曾俊
677+
if let Err(e) = responsor //返回结果未处理 曾俊
675678
.send_resp(WriteOneDataResponse {
676679
remote_version: 0,
677680
success: false,
678681
message,
679682
})
680-
.await
681-
.todo_handle("2 err_comment waitting to fill");
683+
.await {
684+
tracing::error!("Failed to send write one data response 2 : {}", e);
685+
}
686+
// .todo_handle("2 err_comment waitting to fill");
682687
};
683688

684689
loop {
@@ -779,7 +784,7 @@ impl DataGeneral {
779784
|| check_meta.as_ref().unwrap().0 != required_meta.as_ref().unwrap().0
780785
{
781786
drop(guard);
782-
responsor //返回结果未处理 曾俊
787+
if let Err(e) = responsor //返回结果未处理 曾俊
783788
.send_resp(WriteOneDataResponse {
784789
remote_version: if check_meta.is_none() {
785790
0
@@ -789,8 +794,10 @@ impl DataGeneral {
789794
success: false,
790795
message: "meta is updated again, cancel write".to_owned(),
791796
})
792-
.await
793-
.todo_handle("3 err_comment waitting to fill");
797+
.await{
798+
tracing::error!("Failed to send write one data response 3: {}", e);
799+
}
800+
// .todo_handle("3 err_comment waitting to fill");
794801
return;
795802
}
796803

@@ -820,14 +827,16 @@ impl DataGeneral {
820827
kv_store_engine.flush();
821828
drop(guard);
822829
tracing::debug!("data partial is written");
823-
responsor //返回结果未使用 曾俊
830+
if let Err(e) = responsor //返回结果未使用 曾俊
824831
.send_resp(WriteOneDataResponse {
825832
remote_version: req.version,
826833
success: true,
827834
message: "".to_owned(),
828835
})
829-
.await
830-
.todo_handle("4 err_comment waitting to fill");
836+
.await{
837+
tracing::error!("Failed to send write one data response 4: {}", e);
838+
}
839+
// .todo_handle("4 err_comment waitting to fill");
831840
}
832841

833842
async fn rpc_handle_data_meta_update(
@@ -867,57 +876,69 @@ impl DataGeneral {
867876
drop(_kv_write_lock_guard);
868877
let err_msg = "New data version is smaller, failed update";
869878
tracing::warn!("{}", err_msg);
870-
responsor //返回结果未处理 曾俊
879+
if let Err(e) = responsor //返回结果未处理 曾俊
871880
.send_resp(proto::DataMetaUpdateResponse {
872881
version: old_meta.version,
873882
message: err_msg.to_owned(),
874883
})
875-
.await
876-
.todo_handle("5 err_comment waitting to fill");
884+
.await{
885+
tracing::error!("Failed to send data meta update response 5: {}", e);
886+
}
887+
// .todo_handle("5 err_comment waitting to fill");
877888
return;
878889
}
879890
old_meta.version = req.version;
880891
if req.serialized_meta.len() > 0 {
881-
self.view.kv_store_engine() //返回结果未处理 曾俊
882-
.set_raw(&keybytes, std::mem::take(&mut req.serialized_meta), true)
883-
.todo_handle("6 err_comment waitting to fill");
892+
if let Err(e) = self.view.kv_store_engine() //返回结果未处理 曾俊
893+
.set_raw(&keybytes, std::mem::take(&mut req.serialized_meta), true){
894+
tracing::error!("Failed to set raw data in kv store 6: {}", e);
895+
}
896+
// .todo_handle("6 err_comment waitting to fill");
884897
} else {
885-
self.view.kv_store_engine() //返回结果未处理 曾俊
886-
.set(key, &old_meta, true)
887-
.todo_handle("7 err_comment waitting to fill");
898+
if let Err(e) = self.view.kv_store_engine() //返回结果未处理 曾俊
899+
.set(key, &old_meta, true){
900+
tracing::error!("Failed to set raw data in kv store 7: {}", e);
901+
}
902+
// .todo_handle("7 err_comment waitting to fill");
888903
}
889904
} else {
890905
if req.serialized_meta.len() > 0 {
891906
tracing::debug!(
892907
"set new meta data, {:?}",
893908
bincode::deserialize::<DataSetMeta>(&req.serialized_meta)
894909
);
895-
self.view.kv_store_engine() //返回结果未处理 曾俊
896-
.set_raw(&keybytes, std::mem::take(&mut req.serialized_meta), true)
897-
.todo_handle("8 err_comment waitting to fill");
910+
if let Err(e) = self.view.kv_store_engine() //返回结果未处理 曾俊
911+
.set_raw(&keybytes, std::mem::take(&mut req.serialized_meta), true){
912+
tracing::error!("Failed to set raw data in kv store 8: {}", e);
913+
}
914+
// .todo_handle("8 err_comment waitting to fill");
898915
} else {
899916
drop(_kv_write_lock_guard);
900917
let err_msg = "Old meta data not found and missing new meta";
901918
tracing::warn!("{}", err_msg);
902-
responsor //返回结果未处理 曾俊
919+
if let Err(e) = responsor //返回结果未处理 曾俊
903920
.send_resp(proto::DataMetaUpdateResponse {
904921
version: 0,
905922
message: err_msg.to_owned(),
906923
})
907-
.await
908-
.todo_handle("9 err_comment waitting to fill");
924+
.await{
925+
tracing::error!("Failed to send data meta update response 9: {}", e);
926+
}
927+
// .todo_handle("9 err_comment waitting to fill");
909928
return;
910929
}
911930
}
912931
drop(_kv_write_lock_guard);
913932
tracing::debug!("rpc_handle_data_meta_update success");
914-
responsor //返回结果未处理 曾俊
933+
if let Err(e) = responsor //返回结果未处理 曾俊
915934
.send_resp(proto::DataMetaUpdateResponse {
916935
version: req.version,
917936
message: "Update success".to_owned(),
918937
})
919-
.await
920-
.todo_handle("10 err_comment waitting to fill");
938+
.await{
939+
tracing::error!("Failed to send data meta update response 10: {}", e);
940+
}
941+
// .todo_handle("10 err_comment waitting to fill");
921942
}
922943

923944
async fn rpc_handle_get_data_meta(
@@ -1588,9 +1609,11 @@ impl LogicalModule for DataGeneral {
15881609
.regist(p2p, move |responsor, req| {
15891610
let view = view.clone();
15901611
let _ = tokio::spawn(async move {
1591-
view.data_general().rpc_handle_get_data_meta(req, responsor) //返回结果未处理 曾俊
1592-
.await
1593-
.todo_handle("rpc_handle_get_data_meta err");
1612+
if let Err(e) = view.data_general().rpc_handle_get_data_meta(req, responsor) //返回结果未处理 曾俊
1613+
.await{
1614+
tracing::error!("Failed to handle get data meta: {}", e);
1615+
}
1616+
// .todo_handle("rpc_handle_get_data_meta err");
15941617
});
15951618
Ok(())
15961619
});

src/main/src/general/m_os/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ impl OperatingSystem {
254254
})
255255
.await
256256
.unwrap();
257-
responser.send_resp(res).await.todo_handle("This part of the code needs to be implemented."); //返回结果未处理 曾俊
257+
if let Err(e) = responser.send_resp(res).await {
258+
tracing::error!("Failed to send run cmd response: {}", e);
259+
}
258260
}
259261

260262
async fn remote_get_dir_content_handler(
@@ -298,9 +300,7 @@ impl OperatingSystem {
298300
get_dir_content_resp::GetDirContentRespOk { files, dirs },
299301
)),
300302
}
301-
// 在这里使用 responser 将 dir_contents 发送回调用方
302303
} else {
303-
// 发生读取目录错误,可以选择使用 responser 发送错误消息
304304
GetDirContentResp {
305305
dispatch: Some(get_dir_content_resp::Dispatch::Fail(
306306
GetDirContentRespFail {
@@ -321,7 +321,9 @@ impl OperatingSystem {
321321
})
322322
.await
323323
.unwrap();
324-
responser.send_resp(res).await.todo_handle("This part of the code needs to be implemented."); //返回结果未处理 曾俊
324+
if let Err(e) = responser.send_resp(res).await {
325+
tracing::error!("Failed to send get dir content response: {}", e);
326+
}
325327
}
326328

327329
pub fn open_file(&self, fname: &str) -> WSResult<i32> {

src/main/src/general/network/m_p2p.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,16 +415,18 @@ impl P2PModule {
415415
let _ = self
416416
.waiting_tasks
417417
.insert((taskid, node_id), Some(tx).into());
418-
self.dispatch(
418+
if let Err(e) = self.dispatch( //返回结果未处理 曾俊
419419
node_id,
420420
r.msg_id(),
421421
taskid,
422422
DispatchPayload::Local(Box::new(r)),
423-
)
423+
){
424+
tracing::error!("Failed to dispatch rpc: {}", e);
425+
}
424426
//.todo_handle();
425427
//虞光勇修改,修改原因:在调用 todo_handle 方法时遇到了缺少参数的问题。需要确保在调用 todo_handle 方法时提供所需的字符串参数。
426428
//修改内容:加入字符串参数。
427-
.todo_handle("This part of the code needs to be implemented."); //返回结果未处理 曾俊
429+
// .todo_handle("This part of the code needs to be implemented.");
428430
let resp = rx.await.unwrap();
429431
let resp = resp.downcast::<RESP>().unwrap();
430432

src/main/src/general/network/m_p2p_quic.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,10 @@ async fn handle_connection(
361361
match deserialize_msg_id_task_id(&head) {
362362
Ok((msg_id, task_id)) => {
363363
//返回结果未处理 曾俊
364-
view.p2p().dispatch(remote_id, msg_id, task_id, bytes.into()).todo_handle("This part of the code needs to be implemented.");
364+
if let Err(e) = view.p2p().dispatch(remote_id, msg_id, task_id, bytes.into()){
365+
tracing::error!("Failed to dispatch rpc: {}", e);
366+
}
367+
// .todo_handle("This part of the code needs to be implemented.");
365368
}
366369
Err(err) => {
367370
tracing::warn!("incoming deserial head error: {:?}", err);

0 commit comments

Comments
 (0)