|
8 | 8 | "encoding/binary" |
9 | 9 | "errors" |
10 | 10 | "fmt" |
11 | | - "math" |
12 | 11 | "math/big" |
13 | 12 | "os" |
14 | 13 | "path/filepath" |
@@ -478,7 +477,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed |
478 | 477 | broadcastclient.FeedConfigAddOptions(prefix+".feed", f, feedInputEnable, feedOutputEnable) |
479 | 478 | staker.L1ValidatorConfigAddOptions(prefix+".validator", f) |
480 | 479 | SeqCoordinatorConfigAddOptions(prefix+".seq-coordinator", f) |
481 | | - das.DataAvailabilityConfigAddOptions(prefix+".data-availability", f) |
| 480 | + das.DataAvailabilityConfigAddNodeOptions(prefix+".data-availability", f) |
482 | 481 | WasmConfigAddOptions(prefix+".wasm", f) |
483 | 482 | SyncMonitorConfigAddOptions(prefix+".sync-monitor", f) |
484 | 483 | DangerousConfigAddOptions(prefix+".dangerous", f) |
@@ -949,7 +948,7 @@ func createNodeImpl( |
949 | 948 | return nil, err |
950 | 949 | } |
951 | 950 | } else { |
952 | | - daReader, dasLifecycleManager, err = SetUpDataAvailability(ctx, &config.DataAvailability, l1Reader, deployInfo) |
| 951 | + daReader, dasLifecycleManager, err = das.CreateDAReaderForNode(ctx, &config.DataAvailability, l1Reader, &deployInfo.SequencerInbox) |
953 | 952 | if err != nil { |
954 | 953 | return nil, err |
955 | 954 | } |
@@ -1129,225 +1128,6 @@ func (n *Node) OnConfigReload(_ *Config, _ *Config) error { |
1129 | 1128 | return nil |
1130 | 1129 | } |
1131 | 1130 |
|
1132 | | -type L1ReaderCloser struct { |
1133 | | - l1Reader *headerreader.HeaderReader |
1134 | | -} |
1135 | | - |
1136 | | -func (c *L1ReaderCloser) Close(_ context.Context) error { |
1137 | | - c.l1Reader.StopOnly() |
1138 | | - return nil |
1139 | | -} |
1140 | | - |
1141 | | -func (c *L1ReaderCloser) String() string { |
1142 | | - return "l1 reader closer" |
1143 | | -} |
1144 | | - |
1145 | | -// SetUpDataAvailabilityWithoutNode sets up a das.DataAvailabilityService stack |
1146 | | -// without relying on any objects already created for setting up the Node. |
1147 | | -func SetUpDataAvailabilityWithoutNode( |
1148 | | - ctx context.Context, |
1149 | | - config *das.DataAvailabilityConfig, |
1150 | | -) (das.DataAvailabilityService, *das.LifecycleManager, error) { |
1151 | | - var l1Reader *headerreader.HeaderReader |
1152 | | - if config.L1NodeURL != "" && config.L1NodeURL != "none" { |
1153 | | - l1Client, err := das.GetL1Client(ctx, config.L1ConnectionAttempts, config.L1NodeURL) |
1154 | | - if err != nil { |
1155 | | - return nil, nil, err |
1156 | | - } |
1157 | | - l1Reader = headerreader.New(l1Client, func() *headerreader.Config { return &headerreader.DefaultConfig }) // TODO: config |
1158 | | - } |
1159 | | - newDas, lifeCycle, err := SetUpDataAvailability(ctx, config, l1Reader, nil) |
1160 | | - if err != nil { |
1161 | | - return nil, nil, err |
1162 | | - } |
1163 | | - if l1Reader != nil { |
1164 | | - l1Reader.Start(ctx) |
1165 | | - lifeCycle.Register(&L1ReaderCloser{l1Reader}) |
1166 | | - } |
1167 | | - return newDas, lifeCycle, err |
1168 | | -} |
1169 | | - |
1170 | | -// SetUpDataAvailability sets up a das.DataAvailabilityService stack allowing |
1171 | | -// some dependencies that were created for the Node to be injected. |
1172 | | -func SetUpDataAvailability( |
1173 | | - ctx context.Context, |
1174 | | - config *das.DataAvailabilityConfig, |
1175 | | - l1Reader *headerreader.HeaderReader, |
1176 | | - deployInfo *RollupAddresses, |
1177 | | -) (das.DataAvailabilityService, *das.LifecycleManager, error) { |
1178 | | - if !config.Enable { |
1179 | | - return nil, nil, nil |
1180 | | - } |
1181 | | - var syncFromStorageServices []*das.IterableStorageService |
1182 | | - var syncToStorageServices []das.StorageService |
1183 | | - var seqInbox *bridgegen.SequencerInbox |
1184 | | - var err error |
1185 | | - var seqInboxCaller *bridgegen.SequencerInboxCaller |
1186 | | - var seqInboxAddress *common.Address |
1187 | | - |
1188 | | - if l1Reader != nil && deployInfo != nil { |
1189 | | - seqInboxAddress = &deployInfo.SequencerInbox |
1190 | | - seqInbox, err = bridgegen.NewSequencerInbox(deployInfo.SequencerInbox, l1Reader.Client()) |
1191 | | - if err != nil { |
1192 | | - return nil, nil, err |
1193 | | - } |
1194 | | - seqInboxCaller = &seqInbox.SequencerInboxCaller |
1195 | | - } else if config.L1NodeURL == "none" && config.SequencerInboxAddress == "none" { |
1196 | | - l1Reader = nil |
1197 | | - seqInboxAddress = nil |
1198 | | - } else if l1Reader != nil && len(config.SequencerInboxAddress) > 0 { |
1199 | | - seqInboxAddress, err = das.OptionalAddressFromString(config.SequencerInboxAddress) |
1200 | | - if err != nil { |
1201 | | - return nil, nil, err |
1202 | | - } |
1203 | | - if seqInboxAddress == nil { |
1204 | | - return nil, nil, errors.New("must provide data-availability.sequencer-inbox-address set to a valid contract address or 'none'") |
1205 | | - } |
1206 | | - seqInbox, err = bridgegen.NewSequencerInbox(*seqInboxAddress, l1Reader.Client()) |
1207 | | - if err != nil { |
1208 | | - return nil, nil, err |
1209 | | - } |
1210 | | - seqInboxCaller = &seqInbox.SequencerInboxCaller |
1211 | | - } else { |
1212 | | - return nil, nil, errors.New("data-availabilty.l1-node-url and sequencer-inbox-address must be set to a valid L1 URL and contract address or 'none' if running daserver executable") |
1213 | | - } |
1214 | | - |
1215 | | - // This function builds up the DataAvailabilityService with the following topology, starting from the leaves. |
1216 | | - /* |
1217 | | - ChainFetchDAS → Bigcache → Redis → |
1218 | | - SignAfterStoreDAS → |
1219 | | - FallbackDAS (if the REST client aggregator was specified) |
1220 | | - (primary) → RedundantStorage (if multiple persistent backing stores were specified) |
1221 | | - → S3 |
1222 | | - → DiskStorage |
1223 | | - → Database |
1224 | | - (fallback only)→ RESTful client aggregator |
1225 | | -
|
1226 | | - → : X--delegates to-->Y |
1227 | | - */ |
1228 | | - topLevelStorageService, dasLifecycleManager, err := das.CreatePersistentStorageService(ctx, config, &syncFromStorageServices, &syncToStorageServices) |
1229 | | - if err != nil { |
1230 | | - return nil, nil, err |
1231 | | - } |
1232 | | - hasPersistentStorage := topLevelStorageService != nil |
1233 | | - |
1234 | | - // Create the REST aggregator if one was requested. If other storage types were enabled above, then |
1235 | | - // the REST aggregator is used as the fallback to them. |
1236 | | - if config.RestfulClientAggregatorConfig.Enable { |
1237 | | - restAgg, err := das.NewRestfulClientAggregator(ctx, &config.RestfulClientAggregatorConfig) |
1238 | | - if err != nil { |
1239 | | - return nil, nil, err |
1240 | | - } |
1241 | | - restAgg.Start(ctx) |
1242 | | - dasLifecycleManager.Register(restAgg) |
1243 | | - |
1244 | | - // Wrap the primary storage service with the fallback to the restful aggregator |
1245 | | - if hasPersistentStorage { |
1246 | | - syncConf := &config.RestfulClientAggregatorConfig.SyncToStorageConfig |
1247 | | - var retentionPeriodSeconds uint64 |
1248 | | - if uint64(syncConf.RetentionPeriod) == math.MaxUint64 { |
1249 | | - retentionPeriodSeconds = math.MaxUint64 |
1250 | | - } else { |
1251 | | - retentionPeriodSeconds = uint64(syncConf.RetentionPeriod.Seconds()) |
1252 | | - } |
1253 | | - if syncConf.Eager { |
1254 | | - if l1Reader == nil || seqInboxAddress == nil { |
1255 | | - return nil, nil, errors.New("l1-node-url and sequencer-inbox-address must be specified along with sync-to-storage.eager") |
1256 | | - } |
1257 | | - topLevelStorageService, err = das.NewSyncingFallbackStorageService( |
1258 | | - ctx, |
1259 | | - topLevelStorageService, |
1260 | | - restAgg, |
1261 | | - l1Reader, |
1262 | | - *seqInboxAddress, |
1263 | | - syncConf) |
1264 | | - if err != nil { |
1265 | | - return nil, nil, err |
1266 | | - } |
1267 | | - } else { |
1268 | | - topLevelStorageService = das.NewFallbackStorageService(topLevelStorageService, restAgg, |
1269 | | - retentionPeriodSeconds, syncConf.IgnoreWriteErrors, true) |
1270 | | - } |
1271 | | - } else { |
1272 | | - topLevelStorageService = das.NewReadLimitedStorageService(restAgg) |
1273 | | - } |
1274 | | - dasLifecycleManager.Register(topLevelStorageService) |
1275 | | - } |
1276 | | - |
1277 | | - var topLevelDas das.DataAvailabilityService |
1278 | | - if config.AggregatorConfig.Enable { |
1279 | | - panic("Tried to make an aggregator using wrong factory method") |
1280 | | - } |
1281 | | - if hasPersistentStorage && (config.KeyConfig.KeyDir != "" || config.KeyConfig.PrivKey != "") { |
1282 | | - _seqInboxCaller := seqInboxCaller |
1283 | | - if config.DisableSignatureChecking { |
1284 | | - _seqInboxCaller = nil |
1285 | | - } |
1286 | | - |
1287 | | - privKey, err := config.KeyConfig.BLSPrivKey() |
1288 | | - if err != nil { |
1289 | | - return nil, nil, err |
1290 | | - } |
1291 | | - |
1292 | | - // TODO rename StorageServiceDASAdapter |
1293 | | - topLevelDas, err = das.NewSignAfterStoreDASWithSeqInboxCaller( |
1294 | | - privKey, |
1295 | | - _seqInboxCaller, |
1296 | | - topLevelStorageService, |
1297 | | - config.ExtraSignatureCheckingPublicKey, |
1298 | | - ) |
1299 | | - if err != nil { |
1300 | | - return nil, nil, err |
1301 | | - } |
1302 | | - } else { |
1303 | | - topLevelDas = das.NewReadLimitedDataAvailabilityService(topLevelStorageService) |
1304 | | - } |
1305 | | - |
1306 | | - // Enable caches, Redis and (local) BigCache. Local is the outermost, so it will be tried first. |
1307 | | - if config.RedisCacheConfig.Enable { |
1308 | | - cache, err := das.NewRedisStorageService(config.RedisCacheConfig, das.NewEmptyStorageService()) |
1309 | | - dasLifecycleManager.Register(cache) |
1310 | | - if err != nil { |
1311 | | - return nil, nil, err |
1312 | | - } |
1313 | | - if config.RedisCacheConfig.SyncFromStorageServices { |
1314 | | - iterableStorageService := das.NewIterableStorageService(das.ConvertStorageServiceToIterationCompatibleStorageService(cache)) |
1315 | | - syncFromStorageServices = append(syncFromStorageServices, iterableStorageService) |
1316 | | - cache = iterableStorageService |
1317 | | - } |
1318 | | - if config.RedisCacheConfig.SyncToStorageServices { |
1319 | | - syncToStorageServices = append(syncToStorageServices, cache) |
1320 | | - } |
1321 | | - topLevelDas = das.NewCacheStorageToDASAdapter(topLevelDas, cache) |
1322 | | - } |
1323 | | - if config.LocalCacheConfig.Enable { |
1324 | | - cache, err := das.NewBigCacheStorageService(config.LocalCacheConfig, das.NewEmptyStorageService()) |
1325 | | - dasLifecycleManager.Register(cache) |
1326 | | - if err != nil { |
1327 | | - return nil, nil, err |
1328 | | - } |
1329 | | - topLevelDas = das.NewCacheStorageToDASAdapter(topLevelDas, cache) |
1330 | | - } |
1331 | | - |
1332 | | - if config.RegularSyncStorageConfig.Enable && len(syncFromStorageServices) != 0 && len(syncToStorageServices) != 0 { |
1333 | | - regularlySyncStorage := das.NewRegularlySyncStorage(syncFromStorageServices, syncToStorageServices, config.RegularSyncStorageConfig) |
1334 | | - regularlySyncStorage.Start(ctx) |
1335 | | - } |
1336 | | - |
1337 | | - if topLevelDas != nil && seqInbox != nil { |
1338 | | - topLevelDas, err = das.NewChainFetchDASWithSeqInbox(topLevelDas, seqInbox) |
1339 | | - if err != nil { |
1340 | | - return nil, nil, err |
1341 | | - } |
1342 | | - } |
1343 | | - |
1344 | | - if topLevelDas == nil { |
1345 | | - return nil, nil, errors.New("data-availability.enable was specified but no Data Availability server types were enabled") |
1346 | | - } |
1347 | | - |
1348 | | - return topLevelDas, dasLifecycleManager, nil |
1349 | | -} |
1350 | | - |
1351 | 1131 | func CreateNode( |
1352 | 1132 | ctx context.Context, |
1353 | 1133 | stack *node.Node, |
|
0 commit comments