-
Notifications
You must be signed in to change notification settings - Fork 624
STREAMS API in Garnet #1131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
TalZaccai
merged 61 commits into
microsoft:research
from
ramananeesh:ramananeesh/streams
Jul 10, 2025
Merged
STREAMS API in Garnet #1131
Changes from 28 commits
Commits
Show all changes
61 commits
Select commit
Hold shift + click to select a range
75f20bd
Streams API Work in progress
ramananeesh 73b3f20
Streams - sector aligned bufferpool for index work in progress
ramananeesh b4723ec
sector aligned bufferpool integration works
ramananeesh 1068e53
testing code wip
ramananeesh 53e204b
bug fixed
ramananeesh c64785a
fixed major bug in btree
ramananeesh 9a6099e
updates
ramananeesh edaa741
fixed memory alloc by reverting to Marshal.AllocHGlobal
ramananeesh 7710e56
code cleanup
ramananeesh 0129aca
delete passes
ramananeesh fab86a8
Merge branch 'main' of github.com:ramananeesh/garnet into ramananeesh…
ramananeesh 1be22c6
tying up the streams API
ramananeesh d8a41f4
added command to RespCommandsInfo
ramananeesh 70f3bac
basic test with XADD and XLEN works
ramananeesh 460d88f
added XRANGE and XDEL - basic tests work
ramananeesh 579bd6e
fixed bug with stream id parsing/encode/decode
ramananeesh 40af237
added ACL Tests
ramananeesh 83acdc7
added RespStreamTests
ramananeesh 5444205
minor cleanup
ramananeesh 5cab8b7
added cleaner command strings for incorrect arguments
ramananeesh d3551d8
cleanup; removed TRIM code
ramananeesh dcd33b9
cleanup; added more documentation about node structure
ramananeesh 8003334
more cleanup
ramananeesh 5f92120
reverted visibility change to ByteArrayComparer to instead use .Instance
ramananeesh ede575d
removed most of Unsafe.AsPointer() instances from BTree code
ramananeesh 6e4f239
swapped out memory allocation for B-tree to use NativeMemory.AlignedA…
ramananeesh ce15667
cleanup; removed unused imports + commented out code.
ramananeesh e66d113
bug fix - mismatch in allocates v/s deallocates for B-tree
ramananeesh 70bed70
updates fixing PR comments
ramananeesh b356164
reverted an earlier change in Build.props following PR comment + chan…
ramananeesh c45bfd9
Merge branch 'main' of github.com:ramananeesh/garnet into ramananeesh…
ramananeesh 9d9c995
applied changes to propagate modified RESP3 output
ramananeesh 925dbc7
added streams as an optional
ramananeesh ff74ba0
formatting fixes
ramananeesh 04acc5d
minor edit
ramananeesh 859f07b
fixed minor bug in keeping streams optional
ramananeesh 9415e90
cleanup
ramananeesh 6b36d88
updated XADD to return bulk string
ramananeesh 3a89868
minor edits to error messages; used WriteAsciiBulkString for stream i…
ramananeesh f027ed6
added back code for BTreeTrim (work in progress)
ramananeesh c52bc73
fixed validCount bug in TrimByLength
ramananeesh 2b34ff6
fuxed bug on validCount propagation + trimByID
ramananeesh 91f93c2
minor bug fix; added BTreeTrimTests
ramananeesh ca7d283
return headValidKey as ReadOnlySpan<byte>
ramananeesh 4ff5394
cleanup
ramananeesh 0961e1b
connecting XTRIM to Streams API
ramananeesh d6f4e69
added XTrimTests
ramananeesh dfc2963
stream commands error out in case streams is disabled
ramananeesh da0c52f
cleanup + switched to ReadOnlySpan<byte> instead of byte[] where appl…
ramananeesh 4d946f5
matched error messages to Redis documentation
ramananeesh 06a3182
added NOMKSTREAM support for XADD
ramananeesh bf6b291
updated docs
ramananeesh 4b0f911
minor cleanup + refactor variable name
ramananeesh c121a73
minor edit
ramananeesh 5044f36
Merge branch 'main' of github.com:ramananeesh/garnet into ramananeesh…
ramananeesh fff8af0
minor bug fix - enableStreams for ACL tests in server setup
ramananeesh 2796bda
support exact and approximate trimming of index
ramananeesh a2e6c1e
added support for approximate trimming
ramananeesh 24c65eb
Merge branch 'main' of github.com:ramananeesh/garnet into ramananeesh…
ramananeesh 0265078
minor doc update
ramananeesh 8842e92
Merge branch 'microsoft:main' into ramananeesh/streams
ramananeesh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,177 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Runtime.InteropServices; | ||
|
|
||
| namespace Garnet.server.BTreeIndex | ||
| { | ||
| public unsafe partial class BTree | ||
| { | ||
| BTreeNode* root; | ||
| BTreeNode* head; | ||
| BTreeNode* tail; | ||
| byte* tailMinKey; | ||
| public static readonly int MAX_TREE_DEPTH = 10; // maximum allowed depth of the tree | ||
| static int DEFAULT_SPLIT_LEAF_POSITION = (BTreeNode.LEAF_CAPACITY + 1) / 2; // position at which leaf node is split | ||
| static int SPLIT_LEAF_POSITION = BTreeNode.LEAF_CAPACITY; // position at which leaf node is split | ||
| static int SPLIT_INTERNAL_POSITION = BTreeNode.INTERNAL_CAPACITY; // position at which internal node is split | ||
|
|
||
| BTreeNode*[] rootToTailLeaf; // array of nodes from root to tail leaf | ||
| public BTreeStats stats; // statistics about the tree | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="BTree"/> class. | ||
| /// </summary> | ||
| public BTree(uint sectorSize) | ||
| { | ||
| var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE); | ||
| root = BTreeNode.Create(BTreeNodeType.Leaf, memoryBlock); | ||
| head = tail = root; | ||
| root->info->next = root->info->previous = null; | ||
| root->info->count = 0; | ||
| tailMinKey = null; | ||
| rootToTailLeaf = new BTreeNode*[MAX_TREE_DEPTH]; | ||
| stats = new BTreeStats(); | ||
| stats.depth = 1; | ||
| stats.numLeafNodes = 1; | ||
| stats.numAllocates = 1; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Frees the memory allocated for a node | ||
| /// </summary> | ||
| /// <param name="node">BTreeNode to free from memory</param> | ||
| private void Free(ref BTreeNode* node) | ||
| { | ||
| if (node == null) | ||
| return; | ||
|
|
||
| // If this is an internal node, free all its children first | ||
| if (node->info->type == BTreeNodeType.Internal) | ||
| { | ||
| for (int i = 0; i <= node->info->count; i++) | ||
| { | ||
| var child = node->data.children[i]; | ||
| Free(ref child); | ||
| node->data.children[i] = null; | ||
| } | ||
| } | ||
|
|
||
| // Free the memory handle | ||
| if (node->memoryHandle != null) | ||
| { | ||
| NativeMemory.Free(node->memoryHandle); | ||
| stats.numDeallocates++; | ||
| node = null; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Frees the memory allocated for a node | ||
| /// </summary> | ||
| /// <param name="node"></param> | ||
| public static void FreeNode(ref BTreeNode* node) | ||
| { | ||
| if (node == null) | ||
| return; | ||
|
|
||
| // If this is an internal node, free all its children first | ||
| if (node->info->type == BTreeNodeType.Internal) | ||
| { | ||
| for (int i = 0; i <= node->info->count; i++) | ||
| { | ||
| var child = node->data.children[i]; | ||
| FreeNode(ref child); | ||
| node->data.children[i] = null; | ||
| } | ||
| } | ||
|
|
||
| // Free the memory handle | ||
| if (node->memoryHandle != null) | ||
| { | ||
| NativeMemory.Free(node->memoryHandle); | ||
| node = null; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Deallocates the memory allocated for the B+Tree | ||
| /// </summary> | ||
| public void Deallocate() | ||
| { | ||
| if (root == null) | ||
| return; | ||
| Free(ref root); | ||
| Console.WriteLine("free complete"); | ||
| stats.printStats(); | ||
| root = null; | ||
| head = null; | ||
| tail = null; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Destructor for the B+tree | ||
| /// </summary> | ||
| ~BTree() | ||
| { | ||
| Deallocate(); | ||
| } | ||
|
|
||
| public ulong FastInserts => stats.totalFastInserts; | ||
| public ulong LeafCount => stats.numLeafNodes; | ||
| public ulong InternalCount => stats.numInternalNodes; | ||
|
|
||
| public ulong ValidCount => StatsValidCount(); | ||
|
|
||
| public long RootValidCount => GetValidCount(root); | ||
|
|
||
| public long TailValidCount => GetValidCount(tail); | ||
|
|
||
| public long Count() | ||
| { | ||
| return stats.numKeys; | ||
| } | ||
| public ulong StatsValidCount() | ||
| { | ||
| return stats.numValidKeys; | ||
| } | ||
|
|
||
| public long GetValidCount(BTreeNode* node) | ||
| { | ||
| return node->info->validCount; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Retrieves the first entry in the B+Tree (smallest key) | ||
| /// </summary> | ||
| /// <returns>entry fetched</returns> | ||
| public KeyValuePair<byte[], Value> First() | ||
| { | ||
| BTreeNode* leaf = head; | ||
| if (leaf == null) | ||
| { | ||
| return default; | ||
| } | ||
| byte[] keyBytes = new ReadOnlySpan<byte>(leaf->GetKey(0), BTreeNode.KEY_SIZE).ToArray(); | ||
| return new KeyValuePair<byte[], Value>(keyBytes, leaf->GetValue(0)); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Retrieves the last entry in the B+Tree (largest key) | ||
| /// </summary> | ||
| /// <returns>entry fetched</returns> | ||
| public KeyValuePair<byte[], Value> Last() | ||
| { | ||
| BTreeNode* leaf = tail; | ||
| if (leaf == null) | ||
| { | ||
| return default; | ||
| } | ||
| byte[] keyBytes = new ReadOnlySpan<byte>(leaf->GetKey(leaf->info->count - 1), BTreeNode.KEY_SIZE).ToArray(); | ||
| return new KeyValuePair<byte[], Value>(keyBytes, leaf->GetValue(leaf->info->count - 1)); | ||
| } | ||
|
|
||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.