Skip to content

Commit dbe3d7e

Browse files
authored
add aof_writer cmd_writer json_writer (#914)
1 parent 85485fc commit dbe3d7e

5 files changed

Lines changed: 256 additions & 0 deletions

File tree

cmd/redis-shake/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ func main() {
139139
// create writer
140140
var theWriter writer.Writer
141141
switch {
142+
case v.IsSet("file_writer"):
143+
opts := new(writer.FileWriterOptions)
144+
defaults.SetDefaults(opts)
145+
err := v.UnmarshalKey("file_writer", opts)
146+
if err != nil {
147+
log.Panicf("failed to read the FileWriter config entry. err: %v", err)
148+
}
149+
theWriter = writer.NewFileWriter(ctx, opts)
142150
case v.IsSet("redis_writer"):
143151
opts := new(writer.RedisWriterOptions)
144152
defaults.SetDefaults(opts)

docs/src/en/writer/file_writer.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# file_writer
2+
3+
## Introduction
4+
5+
Can use ` file_writer ` to write data to file with type CMD/JSON/AOF .
6+
It is commonly used to extract/migrate/fix data by file.
7+
8+
## configuration
9+
10+
```toml
11+
[file_writer]
12+
filepath = "/tmp/cmd.txt"
13+
type = "cmd" #cmd,aof,json (default cmd)
14+
```
15+
16+
* An absolute filepath should be passed in.
17+
## application scenarios
18+
- share data between two system: one system write aof to disk/s3/oss, another system read file from them.
19+
- partial migrate data with business prefix: extract aof with prefix "XXX:" data from A system, B system import the aof with command `redis-cli --pipe XXX.aof` .
20+
- fix data by cmd file: export cmd data from one system, fix wrong data, and then import cmd file with command `redis-cli < cmd.txt`.
21+
- analysis data with json: export json file, and then import them into mongodb/bi to analysis.
22+
23+
## example output:
24+
### cmd_writer output:
25+
```
26+
SELECT 0
27+
set key1 1
28+
set key2 2
29+
set key3 3
30+
sadd key4 1 2 3 4
31+
lpush key5 1 2 3 4 5
32+
zadd key6 1 2 3 4 5 6
33+
```
34+
### json_writer output:
35+
```
36+
{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23}
37+
{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30}
38+
{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30}
39+
{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30}
40+
{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52}
41+
{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60}
42+
{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66}
43+
```
44+
### aof_writer output:
45+
```
46+
*2
47+
$6
48+
SELECT
49+
$1
50+
0
51+
*3
52+
$3
53+
set
54+
$4
55+
key1
56+
$1
57+
1
58+
```

docs/src/zh/writer/file_writer.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# file_writer
2+
3+
## 介绍
4+
5+
可以使用 ` file_writer` 写文件, 可写的格式有 CMD/JSON/AOF, 常用于通过文件介质抽取/迁移/订正数据.
6+
## 配置
7+
8+
```toml
9+
[file_writer]
10+
filepath = "/tmp/cmd.txt"
11+
type = "cmd" #cmd,aof,json (default cmd)
12+
```
13+
14+
* 绝对路径 filepath 是必填的.
15+
16+
## 应用场景
17+
- 俩系统共享数据: 一个系统把文件写到 disk/s3/oss, 另一系统从中读取.
18+
- 跨系统局部迁移带指定前缀的数据: 从A系统迁出带前缀"XXX:"的数据, B系统通过命令导入这些数据 `redis-cli --pipe XXX.aof` .
19+
- 通过命令文件订正数据: 从一个系统中导出数据成cmd格式, 订正后再导入命令`redis-cli < cmd.txt`.
20+
- 通过json格式做数据分析: 导出成json文件, 导入到mongodb/bi做分析.
21+
22+
## 示例输出
23+
### cmd_writer 输出:
24+
```
25+
SELECT 0
26+
set key1 1
27+
set key2 2
28+
set key3 3
29+
sadd key4 1 2 3 4
30+
lpush key5 1 2 3 4 5
31+
zadd key6 1 2 3 4 5 6
32+
```
33+
### json_writer 输出:
34+
```
35+
{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23}
36+
{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30}
37+
{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30}
38+
{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30}
39+
{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52}
40+
{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60}
41+
{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66}
42+
```
43+
### aof_writer 输出:
44+
```
45+
*2
46+
$6
47+
SELECT
48+
$1
49+
0
50+
*3
51+
$3
52+
set
53+
$4
54+
key1
55+
$1
56+
1
57+
```

internal/writer/file_writer.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package writer
2+
3+
import (
4+
"RedisShake/internal/entry"
5+
"RedisShake/internal/log"
6+
"bufio"
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"os"
11+
"path/filepath"
12+
"strings"
13+
"sync"
14+
"time"
15+
)
16+
17+
type FileType string
18+
19+
const (
20+
AOF FileType = "aof"
21+
CMD FileType = "cmd"
22+
JSON FileType = "json"
23+
)
24+
25+
var FileTypes = []FileType{CMD, AOF, JSON}
26+
27+
type FileWriterOptions struct {
28+
Filepath string `mapstructure:"filepath" default:""`
29+
FileType string `mapstructure:"type" default:"cmd"`
30+
}
31+
32+
type fileWriter struct {
33+
fileType FileType
34+
path string
35+
DbId int
36+
ch chan *entry.Entry
37+
chWg sync.WaitGroup
38+
stat struct {
39+
EntryCount int `json:"entry_count"`
40+
}
41+
}
42+
43+
func (w *fileWriter) Write(e *entry.Entry) {
44+
w.ch <- e
45+
}
46+
47+
func (w *fileWriter) Close() {
48+
close(w.ch)
49+
w.chWg.Wait()
50+
}
51+
52+
func (w *fileWriter) Status() interface{} {
53+
return w.stat
54+
}
55+
56+
func (w *fileWriter) StatusString() string {
57+
return fmt.Sprintf("exported entry count=%d", w.stat.EntryCount)
58+
}
59+
60+
func (w *fileWriter) StatusConsistent() bool {
61+
return true
62+
}
63+
64+
func NewFileWriter(ctx context.Context, opts *FileWriterOptions) Writer {
65+
absolutePath, err := filepath.Abs(opts.Filepath)
66+
if err != nil {
67+
log.Panicf("NewFileWriter path=[%s]: filepath.Abs error: %s", opts.Filepath, err.Error())
68+
}
69+
log.Infof("NewFileWriter absolute path=[%s],type=[%s]", absolutePath, opts.FileType)
70+
w := &fileWriter{
71+
fileType: FileType(opts.FileType),
72+
DbId: 0,
73+
path: absolutePath,
74+
ch: make(chan *entry.Entry),
75+
}
76+
w.stat.EntryCount = 0
77+
return w
78+
}
79+
80+
func (w *fileWriter) StartWrite(ctx context.Context) (ch chan *entry.Entry) {
81+
w.chWg = sync.WaitGroup{}
82+
w.chWg.Add(1)
83+
go w.processWrite(ctx)
84+
return w.ch
85+
86+
}
87+
88+
func (w *fileWriter) processWrite(ctx context.Context) {
89+
ticker := time.NewTicker(10 * time.Millisecond)
90+
defer ticker.Stop()
91+
file, err := os.Create(w.path)
92+
if err != nil {
93+
log.Panicf("create file failed:", err)
94+
return
95+
}
96+
defer file.Close()
97+
writer := bufio.NewWriter(file)
98+
for {
99+
select {
100+
case <-ctx.Done():
101+
// do nothing until w.ch is closed
102+
case <-ticker.C:
103+
writer.Flush()
104+
case e, ok := <-w.ch:
105+
if !ok {
106+
w.chWg.Done()
107+
writer.Flush()
108+
return
109+
}
110+
w.stat.EntryCount++
111+
w.writeEntry(writer, e)
112+
}
113+
}
114+
}
115+
116+
func (w *fileWriter) writeEntry(writer *bufio.Writer, e *entry.Entry) {
117+
switch w.fileType {
118+
case CMD:
119+
writer.WriteString(strings.Join(e.Argv, " ") + "\n")
120+
case AOF:
121+
writer.Write(e.Serialize())
122+
case JSON:
123+
// compute SerializeSize for json result
124+
e.Serialize()
125+
json, _ := json.Marshal(e)
126+
writer.Write(json)
127+
writer.WriteString("\n")
128+
}
129+
}

shake.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ password = "" # keep empty if no authentication is required
3535
tls = false
3636
off_reply = false # turn off the server reply
3737

38+
# [file_writer]
39+
# filepath = "/tmp/cmd.txt"
40+
# type = "cmd" #cmd,aof,json (default cmd)
41+
3842
[filter]
3943
# Allow keys with specific prefixes or suffixes
4044
# Examples:

0 commit comments

Comments
 (0)