Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/sparknlp/annotator/embeddings/auto_gguf_embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,3 +532,8 @@ def pretrained(name="Qwen3_Embedding_0.6B_Q8_0_gguf", lang="en", remote_loc=None
return ResourceDownloader.downloadModel(
AutoGGUFEmbeddings, name, lang, remote_loc
)

def close(self):
"""Closes the llama.cpp model backend freeing resources. The model is reloaded when used again.
"""
self._java_obj.close()
5 changes: 5 additions & 0 deletions python/sparknlp/annotator/seq2seq/auto_gguf_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,8 @@ def pretrained(name="Phi_4_mini_instruct_Q4_K_M_gguf", lang="en", remote_loc=Non
"""
from sparknlp.pretrained import ResourceDownloader
return ResourceDownloader.downloadModel(AutoGGUFModel, name, lang, remote_loc)

def close(self):
"""Closes the llama.cpp model backend freeing resources. The model is reloaded when used again.
"""
self._java_obj.close()
5 changes: 5 additions & 0 deletions python/sparknlp/annotator/seq2seq/auto_gguf_reranker.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,8 @@ def pretrained(name="bge_reranker_v2_m3_Q4_K_M", lang="en", remote_loc=None):
"""
from sparknlp.pretrained import ResourceDownloader
return ResourceDownloader.downloadModel(AutoGGUFReranker, name, lang, remote_loc)

def close(self):
"""Closes the llama.cpp model backend freeing resources. The model is reloaded when used again.
"""
self._java_obj.close()
5 changes: 5 additions & 0 deletions python/sparknlp/annotator/seq2seq/auto_gguf_vision_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,8 @@ def pretrained(name="Qwen2.5_VL_3B_Instruct_Q4_K_M_gguf", lang="en", remote_loc=
"""
from sparknlp.pretrained import ResourceDownloader
return ResourceDownloader.downloadModel(AutoGGUFVisionModel, name, lang, remote_loc)

def close(self):
"""Closes the llama.cpp model backend freeing resources. The model is reloaded when used again.
"""
self._java_obj.close()
47 changes: 39 additions & 8 deletions python/test/annotator/embeddings/auto_gguf_embeddings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from sparknlp.annotator import *
from sparknlp.base import *
from test.util import SparkContextForTest
from test.util import *


@pytest.mark.slow
Expand Down Expand Up @@ -58,7 +58,7 @@ def runTest(self):
embds = row["embeddings"][0]
assert embds is not None
assert (
sum(embds) > 0
sum(embds) > 0
), "Embeddings should not be zero. Was there an error on llama.cpp side?"


Expand Down Expand Up @@ -100,7 +100,7 @@ def runTest(self):
embds = row["embeddings"][0]
assert embds is not None
assert (
sum(embds) > 0
sum(embds) > 0
), "Embeddings should not be zero. Was there an error on llama.cpp side?"


Expand All @@ -114,7 +114,7 @@ def setUp(self):
self.long_data_copies = 16
self.long_text = "All work and no play makes Jack a dull boy" * 100
self.long_data = self.spark.createDataFrame(
[self.long_text] * self.long_data_copies, schema="string"
[self.long_text] * self.long_data_copies, schema="string"
).toDF("text").repartition(4)

def runTest(self):
Expand Down Expand Up @@ -144,7 +144,7 @@ def setUp(self):
self.long_data_copies = 16
self.long_text = "All work and no play makes Jack a dull boy" * 100
self.long_data = self.spark.createDataFrame(
[self.long_text] * self.long_data_copies, schema="string"
[self.long_text] * self.long_data_copies, schema="string"
).toDF("text").repartition(4)

def runTest(self):
Expand All @@ -165,7 +165,7 @@ def runTest(self):
embds = row[0][0]["embeddings"]
assert embds is not None
assert (
sum(embds) > 0
sum(embds) > 0
), "Embeddings should not be zero. Was there an error on llama.cpp side?"


Expand All @@ -185,6 +185,37 @@ def runTest(self):
)
model_writer.save(model_path)
AutoGGUFEmbeddings.load(model_path)

model_path = "file:///tmp/autoggufembeddings_spark_nlp"
AutoGGUFEmbeddings.load(model_path)
AutoGGUFEmbeddings.load(model_path)


@pytest.mark.slow
class AutoGGUFEmbeddingsCloseTest(unittest.TestCase):
def setUp(self):
self.spark = SparkSessionForTest.spark

self.data = self.spark.createDataFrame(
[
["The moons of Jupiter are "],
]
).toDF("text")

self.document_assembler = (
DocumentAssembler().setInputCol("text").setOutputCol("document")
)

def runTest(self):
model = (
AutoGGUFEmbeddings.pretrained()
.setInputCols("document")
.setOutputCol("embeddings")
)

pipeline = Pipeline().setStages([self.document_assembler, model])
pipeline.fit(self.data).transform(self.data).show()

ramChange = measureRAMChange(lambda: model.close())

print(f"Freed RAM after closing the model: {ramChange} MB")
assert (ramChange < -100, "Freed RAM should be greater than 100 MB")
42 changes: 37 additions & 5 deletions python/test/annotator/seq2seq/auto_gguf_model_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from sparknlp.annotator import *
from sparknlp.base import *
from test.util import SparkContextForTest
from test.util import *


@pytest.mark.slow
Expand Down Expand Up @@ -232,9 +232,7 @@ def runTest(self):
)
for row in collected:
annotation = row[0][0]
self.assertEqual(
annotation["result"], "", "Completions should be empty"
)
self.assertEqual(annotation["result"], "", "Completions should be empty")
self.assertIn(
"llamacpp_exception",
annotation["metadata"],
Expand All @@ -258,6 +256,40 @@ def runTest(self):
)
model_writer.save(model_path)
AutoGGUFModel.load(model_path)

model_path = "file:///tmp/autogguf_spark_nlp"
AutoGGUFModel.load(model_path)


@pytest.mark.slow
class AutoGGUFModelCloseTest(unittest.TestCase):
def setUp(self):
self.spark = SparkSessionForTest.spark

self.data = self.spark.createDataFrame(
[
["The moons of Jupiter are "],
["Earth is "],
["The moon is "],
["The sun is "],
]
).toDF("text")

self.document_assembler = (
DocumentAssembler().setInputCol("text").setOutputCol("document")
)

def runTest(self):
model = (
AutoGGUFModel.pretrained()
.setInputCols("document")
.setOutputCol("completions")
)

pipeline = Pipeline().setStages([self.document_assembler, model])
pipeline.fit(self.data).transform(self.data).show()

ramChange = measureRAMChange(lambda: model.close())

print(f"Freed RAM after closing the model: {ramChange} MB")
assert (ramChange < -100, "Freed RAM should be greater than 100 MB")
37 changes: 36 additions & 1 deletion python/test/annotator/seq2seq/auto_gguf_reranker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from sparknlp.annotator import *
from sparknlp.base import *
from test.util import SparkContextForTest
from test.util import *


@pytest.mark.slow
Expand Down Expand Up @@ -419,3 +419,38 @@ def runTest(self):

print("Combined filters test completed successfully")
results.select("ranked_documents").show(truncate=False)


@pytest.mark.slow
class AutoGGUFRerankerCloseTest(unittest.TestCase):
def setUp(self):
self.spark = SparkSessionForTest.spark

self.data = (
self.spark.createDataFrame(
[
["The moons of Jupiter are "],
]
)
.toDF("text")
)

self.document_assembler = (
DocumentAssembler().setInputCol("text").setOutputCol("document")
)

def runTest(self):
model = (
AutoGGUFReranker.pretrained()
.setInputCols("document")
.setOutputCol("reranked_documents")
.setBatchSize(4)
.setQuery("A query.")
)
pipeline = Pipeline().setStages([self.document_assembler, model])
pipeline.fit(self.data).transform(self.data).show()

ramChange = measureRAMChange(lambda: model.close())

print(f"Freed RAM after closing the model: {ramChange} MB")
assert (ramChange < -100, "Freed RAM should be greater than 100 MB")
100 changes: 62 additions & 38 deletions python/test/annotator/seq2seq/auto_gguf_vision_model_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,43 @@

from sparknlp.annotator import *
from sparknlp.base import *
from test.util import SparkSessionForTest
from test.util import SparkSessionForTest, measureRAMChange


def setup_context(spark):
documentAssembler = (
DocumentAssembler().setInputCol("caption").setOutputCol("caption_document")
)
imageAssembler = (
ImageAssembler().setInputCol("image").setOutputCol("image_assembler")
)
imagesPath = "../src/test/resources/image/"
data = ImageAssembler.loadImagesAsBytes(spark, imagesPath).withColumn(
"caption",
lit(
"Describe in a short and easy to understand sentence what you see in the image."
),
) # Add a caption to each image.
nPredict = 40
model: AutoGGUFVisionModel = (
AutoGGUFVisionModel.pretrained()
.setInputCols(["caption_document", "image_assembler"])
.setOutputCol("completions")
.setBatchSize(2)
.setNGpuLayers(99)
.setNCtx(4096)
.setMinKeep(0)
.setMinP(0.05)
.setNPredict(nPredict)
.setPenalizeNl(True)
.setRepeatPenalty(1.18)
.setTemperature(0.05)
.setTopK(40)
.setTopP(0.95)
)
pipeline = Pipeline().setStages([documentAssembler, imageAssembler, model])

return pipeline, data, model


@pytest.mark.slow
Expand All @@ -27,40 +63,7 @@ def setUp(self):
self.spark = SparkSessionForTest.spark

def runTest(self):
documentAssembler = (
DocumentAssembler().setInputCol("caption").setOutputCol("caption_document")
)
imageAssembler = (
ImageAssembler().setInputCol("image").setOutputCol("image_assembler")
)
imagesPath = "../src/test/resources/image/"
data = ImageAssembler.loadImagesAsBytes(self.spark, imagesPath).withColumn(
"caption",
lit(
"Describe in a short and easy to understand sentence what you see in the image."
),
) # Add a caption to each image.
nPredict = 40
model: AutoGGUFVisionModel = (
AutoGGUFVisionModel.pretrained()
.setInputCols(["caption_document", "image_assembler"])
.setOutputCol("completions")
.setBatchSize(2)
.setNGpuLayers(99)
.setNCtx(4096)
.setMinKeep(0)
.setMinP(0.05)
.setNPredict(nPredict)
.setPenalizeNl(True)
.setRepeatPenalty(1.18)
.setTemperature(0.05)
.setTopK(40)
.setTopP(0.95)
)
pipeline = Pipeline().setStages([documentAssembler, imageAssembler, model])
# pipeline.fit(data).transform(data).selectExpr(
# "reverse(split(image.origin, '/'))[0] as image_name", "completions.result"
# ).show(truncate=False)
pipeline, data, _ = setup_context(self.spark)

results = pipeline.fit(data).transform(data).collect()

Expand All @@ -83,7 +86,7 @@ def runTest(self):

print(f"Image: {image_name}, Completion: {completion}")
assert (
expectedWords[image_name] in completion.lower()
expectedWords[image_name] in completion.lower()
), f"Expected '{expectedWords[image_name]}' in '{completion.lower()}'"


Expand All @@ -103,6 +106,27 @@ def runTest(self):
)
model_writer.save(model_path)
AutoGGUFVisionModel.load(model_path)

model_path = "file:///tmp/autoggufvisionmodel_spark_nlp"
AutoGGUFVisionModel.load(model_path)
AutoGGUFVisionModel.load(model_path)


@pytest.mark.slow
class AutoGGUFVisionModelCloseTest(unittest.TestCase):
def setUp(self):
self.spark = SparkSessionForTest.spark

def runTest(self):
pipeline, data, model = setup_context(self.spark)

(
pipeline.fit(data)
.transform(data.limit(1))
.select("completions.result")
.show(truncate=False)
)

ramChange = measureRAMChange(lambda: model.close())

print(f"Freed RAM after closing the model: {ramChange} MB")
assert (ramChange < -100, "Freed RAM should be greater than 100 MB")
Loading