11import typing
2+ import time
3+ import uuid
24
35import pybbt
46import requests
@@ -21,7 +23,9 @@ def create_sync_opts(src: Redis, dst: Redis) -> typing.Dict:
2123 "redis_writer" : {
2224 "cluster" : dst .is_cluster (),
2325 "address" : dst .get_address ()
24- }
26+ },
27+ "__src" : src ,
28+ "__dst" : dst ,
2529 }
2630 return d
2731
@@ -35,32 +39,36 @@ def create_scan_opts(src: Redis, dst: Redis) -> typing.Dict:
3539 "redis_writer" : {
3640 "cluster" : dst .is_cluster (),
3741 "address" : dst .get_address ()
38- }
42+ },
43+ "__src" : src ,
44+ "__dst" : dst ,
3945 }
4046 return d
4147
4248 @staticmethod
43- def create_rdb_opts (rdb_path : str , dts : Redis ) -> typing .Dict :
49+ def create_rdb_opts (rdb_path : str , dst : Redis ) -> typing .Dict :
4450 d = {
4551 "rdb_reader" : {"filepath" : rdb_path },
4652 "redis_writer" : {
47- "cluster" : dts .is_cluster (),
48- "address" : dts .get_address ()
49- }
53+ "cluster" : dst .is_cluster (),
54+ "address" : dst .get_address ()
55+ },
56+ "__dst" : dst ,
5057 }
5158 return d
5259
5360 @staticmethod
54- def create_aof_opts (aof_path : str , dts : Redis , timestamp : int = 0 ) -> typing .Dict :
61+ def create_aof_opts (aof_path : str , dst : Redis , timestamp : int = 0 ) -> typing .Dict :
5562 d = {
5663 "aof_reader" : {"filepath" : aof_path , "timestamp" : timestamp },
5764 "redis_writer" : {
58- "cluster" : dts .is_cluster (),
59- "address" : dts .get_address ()
65+ "cluster" : dst .is_cluster (),
66+ "address" : dst .get_address ()
6067 },
6168 "advanced" : {
6269 "log_level" : "debug"
63- }
70+ },
71+ "__dst" : dst ,
6472 }
6573 return d
6674
@@ -70,6 +78,11 @@ def __init__(self, opts: typing.Dict):
7078 self .case_ctx = pybbt .get_case_context ()
7179 self .status_port = get_free_port ()
7280 self .status_url = f"http://localhost:{ self .status_port } "
81+
82+ # 提取并保存 src/dst 引用
83+ self .src = opts .pop ("__src" , None )
84+ self .dst = opts .pop ("__dst" , None )
85+
7386 opts ["advanced" ] = {"status_port" : self .status_port , "log_level" : "debug" }
7487
7588 self .dir = f"{ self .case_ctx .dir } /shake{ self .status_port } "
@@ -82,6 +95,9 @@ def __init__(self, opts: typing.Dict):
8295
8396 @staticmethod
8497 def run_once (opts : typing .Dict ):
98+ # 移除内部字段
99+ opts .pop ("__src" , None )
100+ opts .pop ("__dst" , None )
85101 status_port = get_free_port ()
86102 run_dir = f"{ pybbt .get_case_context ().dir } /shake{ status_port } "
87103 create_empty_dir (run_dir )
@@ -107,3 +123,59 @@ def _wait_start(self, timeout=5):
107123
108124 def is_consistent (self ):
109125 return self .get_status ()["consistent" ]
126+
127+ def wait_for_sync (self , timeout : float = 5 , db : int = 0 ):
128+ """
129+ Wait for data to be fully synced from src to dst.
130+
131+ 1. Wait for is_consistent() - RDB sync done
132+ 2. Check dbsize matches
133+ 3. Write marker key to src
134+ 4. Wait for marker to appear in dst
135+ 5. Delete marker key
136+ 6. Wait for consistent again
137+ """
138+ if self .src is None or self .dst is None :
139+ raise ValueError ("src and dst not available, use ShakeOpts.create_sync_opts() or create_scan_opts()" )
140+
141+ timer = Timer ()
142+
143+ # 1. Wait for is_consistent()
144+ while not self .is_consistent ():
145+ if timer .elapsed () > timeout :
146+ raise TimeoutError (f"is_consistent() not reached within { timeout } s" )
147+ time .sleep (0.01 )
148+
149+ # 2. Check dbsize (cluster doesn't support SELECT, use db 0)
150+ if not self .src .is_cluster ():
151+ self .src .do ("select" , db )
152+ if not self .dst .is_cluster ():
153+ self .dst .do ("select" , db )
154+ src_dbsize = self .src .dbsize ()
155+ while self .dst .dbsize () != src_dbsize :
156+ if timer .elapsed () > timeout :
157+ raise TimeoutError (f"dbsize mismatch within { timeout } s: src={ src_dbsize } , dst={ self .dst .dbsize ()} " )
158+ time .sleep (0.01 )
159+
160+ # 3. Write marker key
161+ marker_key = f"__shake_sync_marker_{ uuid .uuid4 ()} "
162+ marker_value = str (time .time ())
163+ self .src .do ("set" , marker_key , marker_value )
164+
165+ # 4. Wait for marker in dst
166+ while True :
167+ result = self .dst .do ("get" , marker_key )
168+ if result == marker_value .encode ():
169+ break
170+ if timer .elapsed () > timeout :
171+ raise TimeoutError (f"marker key not synced within { timeout } s" )
172+ time .sleep (0.01 )
173+
174+ # 5. Delete marker key
175+ self .src .do ("del" , marker_key )
176+
177+ # 6. Wait for consistent again
178+ while not self .is_consistent ():
179+ if timer .elapsed () > timeout :
180+ raise TimeoutError (f"final is_consistent() not reached within { timeout } s" )
181+ time .sleep (0.01 )
0 commit comments