Skip to content

Commit a26ba91

Browse files
committed
Merged in coinplugin/go-metadium (pull request #21)
Master
2 parents 95ee424 + a038d3d commit a26ba91

File tree

23 files changed

+943
-70
lines changed

23 files changed

+943
-70
lines changed

cmd/geth/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ var (
183183
utils.PrefetchCount,
184184
utils.LogFlag,
185185
utils.MaxTxsPerBlock,
186+
utils.Hub,
186187
}
187188
)
188189

cmd/geth/usage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ var AppHelpFlagGroups = []flagGroup{
250250
utils.PrefetchCount,
251251
utils.LogFlag,
252252
utils.MaxTxsPerBlock,
253+
utils.Hub,
253254
},
254255
},
255256
{

cmd/utils/flags.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,11 @@ var (
690690
Usage: "Max # of transactions in a block",
691691
Value: params.MaxTxsPerBlock,
692692
}
693+
Hub = cli.StringFlag{
694+
Name: "hub",
695+
Usage: "Id of message hub",
696+
Value: params.Hub,
697+
}
693698
)
694699

695700
// MakeDataDir retrieves the currently requested data directory, terminating
@@ -1385,6 +1390,9 @@ func SetMetadiumConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
13851390
if ctx.GlobalIsSet(MaxTxsPerBlock.Name) {
13861391
params.MaxTxsPerBlock = ctx.GlobalInt(MaxTxsPerBlock.Name)
13871392
}
1393+
if ctx.GlobalIsSet(Hub.Name) {
1394+
params.Hub = ctx.GlobalString(Hub.Name)
1395+
}
13881396

13891397
if params.ConsensusMethod == params.ConsensusInvalid {
13901398
params.ConsensusMethod = params.ConsensusPoW

common/batch/batch.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// batch.go
2+
3+
package batch
4+
5+
import (
6+
"sync/atomic"
7+
"time"
8+
9+
"github.com/ethereum/go-ethereum/log"
10+
)
11+
12+
type Batch struct {
13+
toInterval time.Duration
14+
timeout time.Duration
15+
batchCount int
16+
ch chan interface{}
17+
count int32
18+
f func(interface{}, int) error
19+
}
20+
21+
func NewBatch(toInterval, timeout time.Duration, batchCount int, f func(interface{}, int) error) *Batch {
22+
return &Batch{
23+
toInterval: toInterval,
24+
timeout: timeout,
25+
batchCount: batchCount,
26+
ch: make(chan interface{}, batchCount*10),
27+
count: 0,
28+
f: f,
29+
}
30+
}
31+
32+
func (b *Batch) Run() {
33+
var (
34+
data []interface{}
35+
lt time.Time = time.Now() // last time
36+
ln int = 0 // last count
37+
)
38+
39+
timer := time.NewTimer(0)
40+
<-timer.C // drain the initial timeout
41+
42+
eod := false
43+
for {
44+
itstimer := false
45+
fire := false
46+
47+
select {
48+
case d := <-b.ch:
49+
atomic.AddInt32(&b.count, -1)
50+
if d == nil {
51+
eod = true
52+
} else {
53+
data = append(data, d)
54+
}
55+
case <-timer.C:
56+
itstimer = true
57+
}
58+
last := b.count == 0
59+
60+
if eod {
61+
break
62+
}
63+
64+
// when to fire
65+
// 1. timer fired
66+
// 1.1 no count change
67+
// 1.2 more than 50 ms passed from the initial
68+
// 2. count >= 100
69+
70+
if !itstimer {
71+
if ln == 0 {
72+
lt = time.Now()
73+
ln = len(data)
74+
timer.Stop()
75+
timer.Reset(b.toInterval)
76+
} else if len(data) >= b.batchCount {
77+
fire = true
78+
}
79+
} else if last {
80+
et := time.Since(lt)
81+
if (len(data) == ln && et > b.toInterval) || et > b.timeout {
82+
fire = true
83+
}
84+
}
85+
86+
if fire {
87+
if len(data) < b.batchCount {
88+
// do it
89+
e := b.f(data, len(data))
90+
if e != nil {
91+
log.Error("Metadium Server", "Failed", e)
92+
} else {
93+
log.Debug("Metadium Server", "Count", len(data))
94+
}
95+
data = nil
96+
} else {
97+
for {
98+
if len(data) < b.batchCount {
99+
break
100+
}
101+
102+
// do it
103+
e := b.f(data, b.batchCount)
104+
if e != nil {
105+
log.Error("Metadium Server", "Failed", e)
106+
} else {
107+
log.Debug("Metadium Server", "Count", b.batchCount)
108+
}
109+
data = data[b.batchCount:]
110+
}
111+
}
112+
}
113+
114+
lt = time.Now()
115+
ln = len(data)
116+
117+
if itstimer && ln != 0 {
118+
timer.Reset(b.toInterval)
119+
} else if !itstimer && ln == 0 {
120+
timer.Stop()
121+
}
122+
}
123+
124+
// got eod, flush the remaining data
125+
for len(data) > 0 {
126+
l := len(data)
127+
if l > b.batchCount {
128+
l = b.batchCount
129+
}
130+
e := b.f(data, l)
131+
if e != nil {
132+
log.Error("Metadium Server", "Failed", e)
133+
} else {
134+
log.Debug("Metadium Server", "Count", l)
135+
}
136+
data = data[l:]
137+
}
138+
}
139+
140+
func (b *Batch) Stop() {
141+
b.ch <- nil
142+
}
143+
144+
func (b *Batch) Put(data interface{}) {
145+
b.ch <- data
146+
atomic.AddInt32(&b.count, 1)
147+
}
148+
149+
// EOF

metadium/miner/lrucache.go renamed to common/lru/lrucache.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// lrucache.go
22

3-
package miner
3+
package lru
44

55
import (
66
"container/list"
@@ -9,22 +9,31 @@ import (
99

1010
type LruCache struct {
1111
lock *sync.RWMutex
12-
max int
13-
count int
12+
max int // max count
13+
count int // current count
14+
fifo bool // if true, element order is not updated on access
1415
lru *list.List
1516
data map[interface{}]interface{}
1617
}
1718

18-
func NewLruCache(max int) *LruCache {
19+
// NewLruCache creates LruCache
20+
func NewLruCache(max int, fifo bool) *LruCache {
1921
return &LruCache{
2022
lock: &sync.RWMutex{},
2123
max: max,
2224
count: 0,
25+
fifo: fifo,
2326
lru: list.New(),
2427
data: map[interface{}]interface{}{},
2528
}
2629
}
2730

31+
// Count returns the current count of elements
32+
func (c *LruCache) Count() int {
33+
return c.count
34+
}
35+
36+
// Put adds a key-value pair
2837
func (c *LruCache) Put(key, value interface{}) {
2938
c.lock.Lock()
3039
defer c.lock.Unlock()
@@ -49,6 +58,7 @@ func (c *LruCache) Put(key, value interface{}) {
4958
}
5059
}
5160

61+
// Get returns a value with the given key if present, nil otherwise.
5262
func (c *LruCache) Get(key interface{}) interface{} {
5363
c.lock.RLock()
5464
defer c.lock.RUnlock()
@@ -58,18 +68,22 @@ func (c *LruCache) Get(key interface{}) interface{} {
5868
return nil
5969
} else {
6070
e := _e.(*list.Element)
61-
c.lru.MoveToFront(e)
71+
if !c.fifo {
72+
c.lru.MoveToFront(e)
73+
}
6274
return e.Value.([]interface{})[1]
6375
}
6476
}
6577

78+
// Exists checks if a key exists.
6679
func (c *LruCache) Exists(key interface{}) bool {
6780
c.lock.RLock()
6881
defer c.lock.RUnlock()
6982
_, ok := c.data[key]
7083
return ok
7184
}
7285

86+
// Del deletes a key-value pair if present. It returns true iff it's present.
7387
func (c *LruCache) Del(key interface{}) bool {
7488
c.lock.Lock()
7589
defer c.lock.Unlock()
@@ -87,4 +101,14 @@ func (c *LruCache) Del(key interface{}) bool {
87101
}
88102
}
89103

104+
// Clear resets the lru
105+
func (c *LruCache) Clear() {
106+
c.lock.Lock()
107+
defer c.lock.Unlock()
108+
109+
c.count = 0
110+
c.lru = list.New()
111+
c.data = map[interface{}]interface{}{}
112+
}
113+
90114
// EOF

0 commit comments

Comments
 (0)