Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 66 additions & 171 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,6 @@ impl BlockingDataset {
Ok(branches)
}

pub fn create_branch(
&mut self,
branch: &str,
version: u64,
source_branch: Option<&str>,
) -> Result<Self> {
let reference = match source_branch {
Some(b) => Ref::from((b, version)),
None => Ref::from(version),
};
let inner = RT.block_on(self.inner.create_branch(branch, reference, None))?;
Ok(Self { inner })
}

pub fn delete_branch(&mut self, branch: &str) -> Result<()> {
RT.block_on(self.inner.delete_branch(branch))?;
Ok(())
Expand All @@ -258,17 +244,8 @@ impl BlockingDataset {
Ok(Self { inner })
}

pub fn create_tag(
&mut self,
tag: &str,
version_number: u64,
branch: Option<&str>,
) -> Result<()> {
RT.block_on(
self.inner
.tags()
.create_on_branch(tag, version_number, branch),
)?;
pub fn create_tag(&mut self, tag: &str, reference: Ref) -> Result<()> {
RT.block_on(self.inner.tags().create(tag, reference))?;
Ok(())
}

Expand All @@ -277,8 +254,8 @@ impl BlockingDataset {
Ok(())
}

pub fn update_tag(&mut self, tag: &str, version: u64, branch: Option<&str>) -> Result<()> {
RT.block_on(self.inner.tags().update_on_branch(tag, version, branch))?;
pub fn update_tag(&mut self, tag: &str, reference: Ref) -> Result<()> {
RT.block_on(self.inner.tags().update(tag, reference))?;
Ok(())
}

Expand Down Expand Up @@ -1357,50 +1334,20 @@ fn inner_shallow_clone<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
target_path: JString,
reference: JObject,
jref: JObject,
storage_options: JObject,
) -> Result<JObject<'local>> {
let target_path_str = target_path.extract(env)?;
let storage_options = env.get_optional(&storage_options, |env, map_obj| {
let jmap = JMap::from_env(env, &map_obj)?;
to_rust_map(env, &jmap)
})?;

let reference = {
let version_number = env.get_optional_u64_from_method(&reference, "getVersionNumber")?;
let tag_name = env.get_optional_string_from_method(&reference, "getTagName")?;
let branch_name = env.get_optional_string_from_method(&reference, "getBranchName")?;
match (version_number, branch_name, tag_name) {
(Some(version_number), branch_name, None) => {
Ref::Version(branch_name, Some(version_number))
}
(None, None, Some(tag_name)) => Ref::Tag(tag_name),
_ => {
return Err(Error::input_error(
"One of (optional branch, version_number) and tag must be specified"
.to_string(),
))
}
}
};

let reference = transform_jref_to_ref(jref, env)?;
let storage_opts = transform_jstorage_options(storage_options, env)?;
let new_ds = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
RT.block_on(
dataset_guard.inner.shallow_clone(
&target_path_str,
reference,
storage_options
.map(|options| {
Some(ObjectStoreParams {
storage_options: Some(options),
..Default::default()
})
})
.unwrap_or(None),
),
)?
RT.block_on(dataset_guard.inner.shallow_clone(
target_path_str.as_str(),
reference,
storage_opts,
))?
};

BlockingDataset { inner: new_ds }.into_java(env)
Expand Down Expand Up @@ -1904,11 +1851,17 @@ fn inner_list_tags<'local>(
let array_list = env.new_object("java/util/ArrayList", "()V", &[])?;

for (tag_name, tag_contents) in tag_map {
let branch_name: JObject = if let Some(branch_name) = tag_contents.branch.as_ref() {
env.new_string(branch_name)?.into()
} else {
JObject::null()
};
let java_tag = env.new_object(
"org/lance/Tag",
"(Ljava/lang/String;JI)V",
"(Ljava/lang/String;Ljava/lang/String;JI)V",
&[
JValue::Object(&env.new_string(tag_name)?.into()),
JValue::Object(&branch_name),
JValue::Long(tag_contents.version as i64),
JValue::Int(tag_contents.manifest_size as i32),
],
Expand All @@ -1928,53 +1881,25 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCreateTag(
mut env: JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
) {
ok_or_throw_without_return!(
env,
inner_create_tag(&mut env, java_dataset, jtag_name, jtag_version)
)
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeCreateTagOnBranch(
mut env: JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jbranch: JString,
jref: JObject,
) {
ok_or_throw_without_return!(
env,
inner_create_tag_on_branch(&mut env, java_dataset, jtag_name, jtag_version, jbranch)
inner_create_tag(&mut env, java_dataset, jtag_name, jref)
)
}

fn inner_create_tag(
env: &mut JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jref: JObject,
) -> Result<()> {
let tag = jtag_name.extract(env)?;
let reference = transform_jref_to_ref(jref, env)?;
let mut dataset_guard =
{ unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? };
dataset_guard.create_tag(tag.as_str(), jtag_version as u64, None)?;
Ok(())
}

fn inner_create_tag_on_branch(
env: &mut JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jbranch: JString,
) -> Result<()> {
let tag = jtag_name.extract(env)?;
let branch = jbranch.extract(env)?;
let mut dataset_guard =
{ unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? };
dataset_guard.create_tag(tag.as_str(), jtag_version as u64, Some(branch.as_str()))?;
dataset_guard.create_tag(tag.as_str(), reference)?;
Ok(())
}

Expand All @@ -1999,54 +1924,25 @@ pub extern "system" fn Java_org_lance_Dataset_nativeUpdateTag(
mut env: JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jref: JObject,
) {
ok_or_throw_without_return!(
env,
inner_update_tag(&mut env, java_dataset, jtag_name, jtag_version)
inner_update_tag(&mut env, java_dataset, jtag_name, jref)
)
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeUpdateTagOnBranch(
mut env: JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jbranch: JString,
) {
ok_or_throw_without_return!(
env,
inner_update_tag_on_branch(&mut env, java_dataset, jtag_name, jtag_version, jbranch)
)
}

fn inner_update_tag_on_branch(
env: &mut JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jbranch: JString,
) -> Result<()> {
let tag = jtag_name.extract(env)?;
let branch = jbranch.extract(env)?;
let mut dataset_guard =
{ unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? };
dataset_guard.update_tag(tag.as_str(), jtag_version as u64, Some(branch.as_str()))?;
Ok(())
}

fn inner_update_tag(
env: &mut JNIEnv,
java_dataset: JObject,
jtag_name: JString,
jtag_version: jlong,
jref: JObject,
) -> Result<()> {
let tag = jtag_name.extract(env)?;
let reference = transform_jref_to_ref(jref, env)?;
let mut dataset_guard =
{ unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? };
dataset_guard.update_tag(tag.as_str(), jtag_version as u64, None)?;
Ok(())
dataset_guard.update_tag(tag.as_str(), reference)
}

#[no_mangle]
Expand Down Expand Up @@ -2128,69 +2024,68 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCreateBranch<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
jbranch: JString,
jversion: jlong,
source_branch_obj: JObject, // Optional<String>
jref: JObject,
jstorage_options: JObject, // Optional<String>
) -> JObject<'local> {
ok_or_throw!(
env,
inner_create_branch(&mut env, java_dataset, jbranch, jversion, source_branch_obj)
inner_create_branch(&mut env, java_dataset, jbranch, jref, jstorage_options)
)
}

fn inner_create_branch<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
jbranch: JString,
jversion: jlong,
source_branch_obj: JObject, // Optional<String>
jref: JObject,
jstorage_options: JObject, // Optional<String>
) -> Result<JObject<'local>> {
let branch_name: String = jbranch.extract(env)?;
let version = jversion as u64;
let source_branch = env.get_string_opt(&source_branch_obj)?;
let new_dataset = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.create_branch(&branch_name, version, source_branch.as_deref())?
};
new_dataset.into_java(env)
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeCreateBranchOnTag<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
jbranch: JString,
jtag_name: JString,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_create_branch_on_tag(&mut env, java_dataset, jbranch, jtag_name)
)
}

fn inner_create_branch_on_tag<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
jbranch: JString,
jtag_name: JString,
) -> Result<JObject<'local>> {
let branch_name: String = jbranch.extract(env)?;
let tag_name: String = jtag_name.extract(env)?;
let reference = Ref::from(tag_name.as_str());
let reference = transform_jref_to_ref(jref, env)?;
let storage_opts = transform_jstorage_options(jstorage_options, env)?;

let new_blocking_dataset = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
let inner = RT.block_on(dataset_guard.inner.create_branch(
branch_name.as_str(),
reference,
None,
storage_opts,
))?;
BlockingDataset { inner }
};
new_blocking_dataset.into_java(env)
}

fn transform_jref_to_ref(jref: JObject, env: &mut JNIEnv) -> Result<Ref> {
let source_tag_name = env.get_optional_string_from_method(&jref, "getTagName")?;
let source_version_number = env.get_optional_u64_from_method(&jref, "getVersionNumber")?;
let source_branch = env.get_optional_string_from_method(&jref, "getBranchName")?;
if let Some(tag_name) = source_tag_name {
Ok(Ref::Tag(tag_name))
} else {
Ok(Ref::Version(source_branch, source_version_number))
}
}

fn transform_jstorage_options(
jstorage_options: JObject,
env: &mut JNIEnv,
) -> Result<Option<ObjectStoreParams>> {
let storage_options = env.get_optional(&jstorage_options, |env, map_obj| {
let jmap = JMap::from_env(env, &map_obj)?;
to_rust_map(env, &jmap)
})?;
Ok(storage_options
.map(|options| {
Some(ObjectStoreParams {
storage_options: Some(options),
..Default::default()
})
})
.unwrap_or(None))
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeDeleteBranch(
mut env: JNIEnv,
Expand Down
Loading