|
15 | 15 | package history |
16 | 16 |
|
17 | 17 | import ( |
| 18 | + "crypto/rand" |
18 | 19 | "encoding/json" |
19 | 20 | "testing" |
20 | 21 |
|
21 | 22 | "github.com/emitter-io/emitter/internal/message" |
| 23 | + "github.com/emitter-io/emitter/internal/network/mqtt" |
22 | 24 | "github.com/emitter-io/emitter/internal/provider/storage" |
23 | 25 | "github.com/emitter-io/emitter/internal/security" |
24 | 26 | "github.com/emitter-io/emitter/internal/service/fake" |
@@ -82,3 +84,115 @@ func TestHistory(t *testing.T) { |
82 | 84 | // The response should have returned the last 2 messages. |
83 | 85 | assert.Equal(t, 2, len(response.(*Response).Messages)) |
84 | 86 | } |
| 87 | + |
| 88 | +func TestLargeMessage(t *testing.T) { |
| 89 | + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} |
| 90 | + store := storage.NewInMemory(nil) |
| 91 | + store.Configure(nil) |
| 92 | + auth := &fake.Authorizer{ |
| 93 | + Success: true, |
| 94 | + Contract: uint32(1), |
| 95 | + ExtraPerm: security.AllowLoad, |
| 96 | + } |
| 97 | + // Create new service |
| 98 | + service := New(auth, store) |
| 99 | + connection := &fake.Conn{} |
| 100 | + |
| 101 | + // The most basic request, on an empty store. |
| 102 | + request := &Request{ |
| 103 | + Key: "key", |
| 104 | + Channel: "key/a/b/c/", |
| 105 | + } |
| 106 | + |
| 107 | + // Store 1 long message |
| 108 | + // Keep in mind the message will be composed of the ID and the channel size on top of the payload. |
| 109 | + // So mqttMaxMessageSize is really smaller than the actual message size. |
| 110 | + randomBytes := make([]byte, mqtt.MaxMessageSize) |
| 111 | + rand.Read(randomBytes) |
| 112 | + firstSSID := message.NewID(ssid) |
| 113 | + store.Store(&message.Message{ |
| 114 | + ID: firstSSID, |
| 115 | + Channel: []byte("a/b/c/"), |
| 116 | + Payload: randomBytes, |
| 117 | + TTL: 30, |
| 118 | + }) |
| 119 | + |
| 120 | + reqBytes, _ := json.Marshal(request) |
| 121 | + |
| 122 | + // Issue the same request |
| 123 | + response, ok := service.OnRequest(connection, reqBytes) |
| 124 | + // The request should have succeeded and returned a response. |
| 125 | + assert.Equal(t, true, ok) |
| 126 | + // The response should have returned the last message as per MQTT spec. |
| 127 | + assert.Equal(t, 0, len(response.(*Response).Messages)) |
| 128 | +} |
| 129 | + |
| 130 | +// ONLY PASSES BECAUSE OF THE BUG, THERE IS ONLY ONE SERVER SO NO GATHER |
| 131 | +// match.Limit(limit) only limits based on the number of messages not the size of the frame |
| 132 | +/*func (s *SSD) Query(ssid message.Ssid, from, until time.Time, startFromID message.ID, limit int) (message.Frame, error) { |
| 133 | +
|
| 134 | + // Construct a query and lookup locally first |
| 135 | + query := newLookupQuery(ssid, from, until, startFromID, limit) |
| 136 | + match := s.lookup(query) |
| 137 | +
|
| 138 | + // Issue the message survey to the cluster |
| 139 | + if req, err := binary.Marshal(query); err == nil && s.survey != nil { |
| 140 | + if awaiter, err := s.survey.Query("ssdstore", req); err == nil { |
| 141 | +
|
| 142 | + // Wait for all presence updates to come back (or a deadline) |
| 143 | + for _, resp := range awaiter.Gather(2000 * time.Millisecond) { |
| 144 | + if frame, err := message.DecodeFrame(resp); err == nil { |
| 145 | + match = append(match, frame...) |
| 146 | + } |
| 147 | + } |
| 148 | + } |
| 149 | + } |
| 150 | +
|
| 151 | + match.Limit(limit) |
| 152 | + return match, nil |
| 153 | +}*/ |
| 154 | +func TestSumOfTwoExceedMaxSize(t *testing.T) { |
| 155 | + ssid := message.Ssid{1, 3238259379, 500706888, 1027807523} |
| 156 | + store := storage.NewInMemory(nil) |
| 157 | + store.Configure(nil) |
| 158 | + auth := &fake.Authorizer{ |
| 159 | + Success: true, |
| 160 | + Contract: uint32(1), |
| 161 | + ExtraPerm: security.AllowLoad, |
| 162 | + } |
| 163 | + // Create new service |
| 164 | + service := New(auth, store) |
| 165 | + connection := &fake.Conn{} |
| 166 | + |
| 167 | + // The most basic request, on an empty store. |
| 168 | + request := &Request{ |
| 169 | + Key: "key", |
| 170 | + Channel: "key/a/b/c/", |
| 171 | + } |
| 172 | + |
| 173 | + // Store 2 messages |
| 174 | + randomBytes := make([]byte, int(mqtt.MaxMessageSize/2)) |
| 175 | + rand.Read(randomBytes) |
| 176 | + firstSSID := message.NewID(ssid) |
| 177 | + store.Store(&message.Message{ |
| 178 | + ID: firstSSID, |
| 179 | + Channel: []byte("a/b/c/"), |
| 180 | + Payload: randomBytes, |
| 181 | + TTL: 30, |
| 182 | + }) |
| 183 | + store.Store(&message.Message{ |
| 184 | + ID: message.NewID(ssid), |
| 185 | + Channel: []byte("a/b/c/"), |
| 186 | + Payload: randomBytes, |
| 187 | + TTL: 30, |
| 188 | + }) |
| 189 | + reqBytes, _ := json.Marshal(request) |
| 190 | + |
| 191 | + request.Channel = "key/a/b/c/?last=2" |
| 192 | + reqBytes, _ = json.Marshal(request) |
| 193 | + response, ok := service.OnRequest(connection, reqBytes) |
| 194 | + // The request should have succeeded and returned a response. |
| 195 | + assert.Equal(t, true, ok) |
| 196 | + // The response should have returned the last 2 messages. |
| 197 | + assert.Equal(t, 1, len(response.(*Response).Messages)) |
| 198 | +} |
0 commit comments