Skip to content

Commit e64d4a0

Browse files
authored
chore(job): serialize a struct to bytes (#103)
BenchmarkEncode BenchmarkEncode/JSON BenchmarkEncode/JSON-2 6139400 189.3 ns/op 96 B/op 1 allocs/op BenchmarkEncode/JSON-2 6316482 189.1 ns/op 96 B/op 1 allocs/op BenchmarkEncode/UnsafeCast BenchmarkEncode/UnsafeCast-2 1000000000 0.5096 ns/op 0 B/op 0 allocs/op BenchmarkEncode/UnsafeCast-2 1000000000 0.5098 ns/op 0 B/op 0 allocs/op BenchmarkDecode BenchmarkDecode/JSON BenchmarkDecode/JSON-2 3469248 344.1 ns/op 99 B/op 2 allocs/op BenchmarkDecode/JSON-2 3480768 343.9 ns/op 99 B/op 2 allocs/op BenchmarkDecode/UnsafeCast BenchmarkDecode/UnsafeCast-2 1000000000 0.4021 ns/op 0 B/op 0 allocs/op BenchmarkDecode/UnsafeCast-2 1000000000 0.4021 ns/op 0 B/op 0 allocs/op
1 parent bd08b4a commit e64d4a0

File tree

7 files changed

+127
-81
lines changed

7 files changed

+127
-81
lines changed

benchmark_test.go

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package queue
22

33
import (
44
"context"
5-
"log"
65
"testing"
76
"time"
87

@@ -55,59 +54,63 @@ func BenchmarkQueueTask(b *testing.B) {
5554
)
5655
b.ReportAllocs()
5756
b.ResetTimer()
57+
58+
m := job.NewTask(func(context.Context) error {
59+
return nil
60+
})
61+
5862
for n := 0; n < b.N; n++ {
59-
err := q.QueueTask(func(context.Context) error {
60-
return nil
61-
})
62-
if err != nil {
63-
log.Fatal(err)
63+
if err := q.queue(m); err != nil {
64+
b.Fatal(err)
6465
}
6566
}
6667
}
6768

6869
func BenchmarkQueue(b *testing.B) {
69-
m := &mockMessage{
70-
message: "foo",
71-
}
7270
w := NewRing()
7371
q, _ := NewQueue(
7472
WithWorker(w),
7573
WithLogger(emptyLogger{}),
7674
)
7775
b.ReportAllocs()
7876
b.ResetTimer()
79-
for n := 0; n < b.N; n++ {
80-
err := q.Queue(m)
81-
if err != nil {
82-
log.Fatal(err)
83-
}
84-
}
85-
}
86-
87-
func BenchmarkRingPayload(b *testing.B) {
88-
b.ReportAllocs()
89-
90-
task := &job.Message{
91-
Timeout: 100 * time.Millisecond,
92-
Payload: []byte(`{"timeout":3600000000000}`),
93-
}
94-
w := NewRing(
95-
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
96-
return nil
97-
}),
98-
)
9977

100-
q, _ := NewQueue(
101-
WithWorker(w),
102-
WithLogger(emptyLogger{}),
103-
)
78+
m := job.NewMessage(&mockMessage{
79+
message: "foo",
80+
})
81+
m.Encode()
10482

10583
for n := 0; n < b.N; n++ {
106-
_ = q.run(task)
84+
if err := q.queue(m); err != nil {
85+
b.Fatal(err)
86+
}
10787
}
10888
}
10989

110-
func BenchmarkRingTask(b *testing.B) {
90+
// func BenchmarkRingPayload(b *testing.B) {
91+
// b.ReportAllocs()
92+
93+
// task := &job.Message{
94+
// Timeout: 100 * time.Millisecond,
95+
// Payload: []byte(`{"timeout":3600000000000}`),
96+
// }
97+
// w := NewRing(
98+
// WithFn(func(ctx context.Context, m core.QueuedMessage) error {
99+
// return nil
100+
// }),
101+
// )
102+
103+
// q, _ := NewQueue(
104+
// WithWorker(w),
105+
// WithLogger(emptyLogger{}),
106+
// )
107+
108+
// for n := 0; n < b.N; n++ {
109+
// _ = q.run(task)
110+
// }
111+
// }
112+
113+
func BenchmarkRingWithTask(b *testing.B) {
111114
b.ReportAllocs()
112115

113116
task := &job.Message{

core/worker.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package core
22

3-
import "context"
3+
import (
4+
"context"
5+
)
46

57
// Worker interface
68
type Worker interface {

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module github.com/golang-queue/queue
33
go 1.18
44

55
require (
6-
github.com/goccy/go-json v0.10.0
76
github.com/golang/mock v1.6.0
87
github.com/stretchr/testify v1.8.1
98
go.uber.org/goleak v1.2.0

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
44
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5-
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
6-
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
75
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
86
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
97
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=

job/job.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package job
33
import (
44
"context"
55
"time"
6+
"unsafe"
67

7-
"github.com/goccy/go-json"
88
"github.com/golang-queue/queue/core"
99
)
1010

@@ -30,30 +30,23 @@ type Message struct {
3030
// RetryDelay set delay between retry
3131
// default is 100ms
3232
RetryDelay time.Duration `json:"retry_delay"`
33-
}
3433

35-
// Bytes get string body
36-
func (m *Message) Bytes() []byte {
37-
if m.Task != nil {
38-
return nil
39-
}
40-
return m.Payload
34+
// Data to save Unsafe cast
35+
Data []byte
4136
}
4237

43-
// Encode for encoding the structure
44-
func (m *Message) Encode() []byte {
45-
b, _ := json.Marshal(m)
38+
const (
39+
movementSize = int(unsafe.Sizeof(Message{}))
40+
)
4641

47-
return b
42+
// Bytes get internal data
43+
func (m *Message) Bytes() []byte {
44+
return m.Data
4845
}
4946

50-
// Rest for reset default value
51-
func (m *Message) Rest() {
52-
m.Task = nil
53-
m.Payload = nil
54-
m.RetryCount = 0
55-
m.Timeout = 0
56-
m.RetryDelay = 0
47+
// Encode for encoding the structure
48+
func (m *Message) Encode() {
49+
m.Data = Encode(m)
5750
}
5851

5952
func NewMessage(m core.QueuedMessage, opts ...AllowOption) *Message {
@@ -77,3 +70,11 @@ func NewTask(task TaskFunc, opts ...AllowOption) *Message {
7770
Task: task,
7871
}
7972
}
73+
74+
func Encode(m *Message) []byte {
75+
return (*[movementSize]byte)(unsafe.Pointer(m))[:]
76+
}
77+
78+
func Decode(m []byte) *Message {
79+
return (*Message)(unsafe.Pointer(&m[0]))
80+
}

job/job_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,53 @@
11
package job
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestTaskBinaryEncode(t *testing.T) {
12+
m := NewTask(func(context.Context) error {
13+
return nil
14+
},
15+
AllowOption{
16+
RetryCount: Int64(100),
17+
RetryDelay: Time(30 * time.Millisecond),
18+
Timeout: Time(3 * time.Millisecond),
19+
},
20+
)
21+
22+
out := Decode(Encode(m))
23+
24+
assert.Equal(t, int64(100), out.RetryCount)
25+
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
26+
}
27+
28+
type mockMessage struct {
29+
message string
30+
}
31+
32+
func (m mockMessage) Bytes() []byte {
33+
return []byte(m.message)
34+
}
35+
36+
func TestMessageBinaryEncode(t *testing.T) {
37+
m := NewMessage(&mockMessage{
38+
message: "foo",
39+
},
40+
AllowOption{
41+
RetryCount: Int64(100),
42+
RetryDelay: Time(30 * time.Millisecond),
43+
Timeout: Time(3 * time.Millisecond),
44+
},
45+
)
46+
47+
m.Encode()
48+
out := Decode(m.Bytes())
49+
50+
assert.Equal(t, int64(100), out.RetryCount)
51+
assert.Equal(t, 30*time.Millisecond, out.RetryDelay)
52+
assert.Equal(t, "foo", string(out.Payload))
53+
}

queue.go

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"sync/atomic"
88
"time"
99

10-
"github.com/goccy/go-json"
1110
"github.com/golang-queue/queue/core"
1211
"github.com/golang-queue/queue/job"
1312
)
@@ -113,35 +112,26 @@ func (q *Queue) Wait() {
113112
q.routineGroup.Wait()
114113
}
115114

116-
// Queue to queue all job
117-
func (q *Queue) Queue(m core.QueuedMessage, opts ...job.AllowOption) error {
118-
if atomic.LoadInt32(&q.stopFlag) == 1 {
119-
return ErrQueueShutdown
120-
}
121-
122-
message := job.NewMessage(m, opts...)
123-
payload := message.Encode()
124-
message.Rest()
125-
message.Payload = payload
126-
127-
if err := q.worker.Queue(message); err != nil {
128-
return err
129-
}
115+
// Queue to queue single job with binary
116+
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error {
117+
data := job.NewMessage(message, opts...)
118+
data.Encode()
130119

131-
q.metric.IncSubmittedTask()
132-
133-
return nil
120+
return q.queue(data)
134121
}
135122

136-
// QueueTask to queue job task
123+
// QueueTask to queue single task
137124
func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error {
125+
data := job.NewTask(task, opts...)
126+
return q.queue(data)
127+
}
128+
129+
func (q *Queue) queue(m *job.Message) error {
138130
if atomic.LoadInt32(&q.stopFlag) == 1 {
139131
return ErrQueueShutdown
140132
}
141133

142-
message := job.NewTask(task, opts...)
143-
144-
if err := q.worker.Queue(message); err != nil {
134+
if err := q.worker.Queue(m); err != nil {
145135
return err
146136
}
147137

@@ -178,7 +168,8 @@ func (q *Queue) work(task core.QueuedMessage) {
178168
func (q *Queue) run(task core.QueuedMessage) error {
179169
data := task.(*job.Message)
180170
if data.Task == nil {
181-
_ = json.Unmarshal(task.Bytes(), data)
171+
data = job.Decode(task.Bytes())
172+
data.Data = data.Payload
182173
}
183174

184175
return q.handle(data)

0 commit comments

Comments
 (0)