Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions extensions/MongoDbAtlas/MongoDbAtlas/MongoDbAtlasStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.KernelMemory.ContentStorage;
using Microsoft.KernelMemory.Pipeline;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Driver.GridFS;
Expand All @@ -15,8 +16,13 @@ namespace Microsoft.KernelMemory.MongoDbAtlas;
[Experimental("KMEXP03")]
public sealed class MongoDbAtlasStorage : MongoDbAtlasBaseStorage, IContentStorage
{
public MongoDbAtlasStorage(MongoDbAtlasConfig config) : base(config)
private readonly IMimeTypeDetection _mimeTypeDetection;

public MongoDbAtlasStorage(
MongoDbAtlasConfig config,
IMimeTypeDetection? mimeTypeDetection = null) : base(config)
{
this._mimeTypeDetection = mimeTypeDetection ?? new MimeTypesDetection();
}

public Task CreateIndexDirectoryAsync(string index, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -73,12 +79,13 @@ public async Task WriteFileAsync(
if (extension == ".txt")
{
using var reader = new StreamReader(streamContent);
var doc = new BsonDocument()
var doc = new BsonDocument
{
{ "_id", id },
{ "documentId", documentId },
{ "fileName", fileName },
{ "content", new BsonString(await reader.ReadToEndAsync(cancellationToken).ConfigureAwait(false)) }
{ "content", new BsonString(await reader.ReadToEndAsync(cancellationToken).ConfigureAwait(false)) },
{ "contentType", MimeTypes.PlainText }
};
await this.SaveDocumentAsync(index, id, doc, cancellationToken).ConfigureAwait(false);
}
Expand All @@ -94,6 +101,7 @@ public async Task WriteFileAsync(
doc["documentId"] = documentId;
doc["fileName"] = fileName;
doc["content"] = content;
doc["contentType"] = MimeTypes.PlainText;
await this.SaveDocumentAsync(index, id, doc, cancellationToken).ConfigureAwait(false);
}
else
Expand All @@ -105,14 +113,15 @@ public async Task WriteFileAsync(
{
{ "index", index },
{ "documentId", documentId },
{ "fileName", fileName }
{ "fileName", fileName },
{ "contentType", this._mimeTypeDetection.GetFileType(fileName) }
}
};

// Since the pattern of usage is that you can upload a file for a document id and then update, we need to delete
// any existing file with the same id check if the file exists and delete it
IAsyncCursor<GridFSFileInfo<string>> existingFile = await GetFromBucketByIdAsync(id, bucket, cancellationToken).ConfigureAwait(false);
if (existingFile.Any(cancellationToken))
if (await existingFile.AnyAsync(cancellationToken).ConfigureAwait(false))
{
await bucket.DeleteAsync(id, cancellationToken).ConfigureAwait(false);
}
Expand Down