From c24f9fb117c3e5d76b5f4bb5ffa75b1346c84c61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teemu=20J=C3=A4rvinen?= Date: Thu, 21 Sep 2023 15:23:17 -0700 Subject: [PATCH 1/3] new assembly --- Project.toml | 5 +--- src/assemble.jl | 66 ++++++++++++++++++------------------------------- 2 files changed, 25 insertions(+), 46 deletions(-) diff --git a/Project.toml b/Project.toml index 177bf56..f75dee2 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ACEfit" uuid = "ad31a8ef-59f5-4a01-b543-a85c2f73e95c" authors = ["William C Witt , Christoph Ortner and contributors"] -version = "0.1.4" +version = "0.1.5-DEV" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" @@ -9,9 +9,7 @@ IterativeSolvers = "42fd0dbc-a981-5370-80f2-aaf504508153" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" LowRankApprox = "898213cb-b102-5a47-900c-97e73b919f73" Optim = "429524aa-4258-5aef-a3af-852621145aeb" -ParallelDataTransfer = "2dcacdae-9679-587a-88bb-8b444fb7085b" ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca" -SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383" StaticArrays = "90137ffa-7385-5640-81b9-e52037218182" [weakdeps] @@ -33,7 +31,6 @@ MLJLinearModels = "0.9" MLJScikitLearnInterface = "0.5" LowRankApprox = "0.5.3" Optim = "1.7" -ParallelDataTransfer = "0.5.0" ProgressMeter = "1.7" PythonCall = "0.9" StaticArrays = "1.5" diff --git a/src/assemble.jl b/src/assemble.jl index a3b189f..2e635e1 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -1,56 +1,38 @@ using Distributed -using ParallelDataTransfer using ProgressMeter -using SharedArrays -struct DataPacket{T <: AbstractData} - rows::UnitRange - data::T -end - -Base.length(d::DataPacket) = count_observations(d.data) """ + assemble(data::AbstractArray, basis; kwargs...) + Assemble feature matrix and target vector for given data and basis. +`kwargs` are used to control `feature_matrix`, `target_vector` and +`weight_vector` calculations. """ -function assemble(data::AbstractVector{<:AbstractData}, basis) - @info "Assembling linear problem." - rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data - rows[1] = 1:count_observations(data[1]) - for i in 2:length(data) - rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) - end - packets = DataPacket.(rows, data) - sort!(packets, by = length, rev = true) - (nprocs() > 1) && sendto(workers(), basis = basis) - @info " - Creating feature matrix with size ($(rows[end][end]), $(length(basis)))." - A = SharedArray(zeros(rows[end][end], length(basis))) - Y = SharedArray(zeros(size(A, 1))) - @info " - Beginning assembly with processor count: $(nprocs())." - @showprogress pmap(packets) do p - A[p.rows, :] .= feature_matrix(p.data, basis) - Y[p.rows] .= target_vector(p.data) - GC.gc() +function assemble(data::AbstractArray, basis; kwargs...) + W = Threads.@spawn ACEfit.assemble_weights(data; kwargs...) + raw_data = @showprogress "Assembly progress:" pmap( data ) do d + A = ACEfit.feature_matrix(d, basis; kwargs...) + Y = ACEfit.target_vector(d; kwargs...) + (A, Y) end - @info " - Assembly completed." - return Array(A), Array(Y), assemble_weights(data) + A = [ a[1] for a in raw_data ] + Y = [ a[2] for a in raw_data ] + + A_final = reduce(vcat, A) + Y_final = reduce(vcat, Y) + return A_final, Y_final, fetch(W) end """ + assemble_weights(data::AbstractArray; kwargs...) + Assemble full weight vector for vector of data elements. +`kwargs` are used to give extra commands for `weight_vector calculation`. """ -function assemble_weights(data::AbstractVector{<:AbstractData}) - @info "Assembling full weight vector." - rows = Array{UnitRange}(undef, length(data)) # row ranges for each element of data - rows[1] = 1:count_observations(data[1]) - for i in 2:length(data) - rows[i] = rows[i - 1][end] .+ (1:count_observations(data[i])) +function assemble_weights(data::AbstractArray; kwargs...) + w = map( data ) do d + ACEfit.weight_vector(d; kwargs...) end - packets = DataPacket.(rows, data) - sort!(packets, by = length, rev = true) - W = SharedArray(zeros(rows[end][end])) - @showprogress pmap(packets) do p - W[p.rows] .= weight_vector(p.data) - end - return Array(W) -end + return reduce(vcat, w) +end \ No newline at end of file From 19bff03a8d2a27d10954e2cfe46f4a3c8afc2047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teemu=20J=C3=A4rvinen?= Date: Thu, 21 Sep 2023 18:23:48 -0700 Subject: [PATCH 2/3] adjust to new progressmeter style --- Project.toml | 2 +- src/assemble.jl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Project.toml b/Project.toml index f75dee2..d90a3b8 100644 --- a/Project.toml +++ b/Project.toml @@ -31,7 +31,7 @@ MLJLinearModels = "0.9" MLJScikitLearnInterface = "0.5" LowRankApprox = "0.5.3" Optim = "1.7" -ProgressMeter = "1.7" +ProgressMeter = "1.8" PythonCall = "0.9" StaticArrays = "1.5" diff --git a/src/assemble.jl b/src/assemble.jl index 2e635e1..c10c6be 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -11,7 +11,7 @@ Assemble feature matrix and target vector for given data and basis. """ function assemble(data::AbstractArray, basis; kwargs...) W = Threads.@spawn ACEfit.assemble_weights(data; kwargs...) - raw_data = @showprogress "Assembly progress:" pmap( data ) do d + raw_data = @showprogress desc="Assembly progress:" pmap( data ) do d A = ACEfit.feature_matrix(d, basis; kwargs...) Y = ACEfit.target_vector(d; kwargs...) (A, Y) From 258ebcac591d2f6f01c1073aa5994e35deb4b14f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teemu=20J=C3=A4rvinen?= Date: Fri, 22 Sep 2023 13:40:51 -0700 Subject: [PATCH 3/3] add batch size support --- src/assemble.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/assemble.jl b/src/assemble.jl index c10c6be..4ae1b8d 100644 --- a/src/assemble.jl +++ b/src/assemble.jl @@ -9,9 +9,9 @@ Assemble feature matrix and target vector for given data and basis. `kwargs` are used to control `feature_matrix`, `target_vector` and `weight_vector` calculations. """ -function assemble(data::AbstractArray, basis; kwargs...) +function assemble(data::AbstractArray, basis; batch_size=1, kwargs...) W = Threads.@spawn ACEfit.assemble_weights(data; kwargs...) - raw_data = @showprogress desc="Assembly progress:" pmap( data ) do d + raw_data = @showprogress desc="Assembly progress:" pmap( data; batch_size=batch_size ) do d A = ACEfit.feature_matrix(d, basis; kwargs...) Y = ACEfit.target_vector(d; kwargs...) (A, Y)