Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/operators/recv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class RecvOp : public framework::OperatorBase {

// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
int64_t barrier_size = param_count * fan_in;
size_t barrier_size = param_count * fan_in;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
Expand Down
21 changes: 7 additions & 14 deletions paddle/operators/split_selected_rows_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker {
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "The input SelectedRows.");
AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable();
AddAttr<std::vector<int>>("rows_sections", "Rows section for output.")
.SetDefault(std::vector<int>({}));
AddAttr<std::vector<int>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int>({}));
Expand All @@ -35,16 +33,16 @@ height_sections is only needed when need to split the dims of the original tenso

Example:
Input:
X.rows = {0, 7, 5}
X.rows = {7, 5}
X.height = 12
Attr:
rows_sections = {1, 2}
height_sections = {}
height_sections = {4, 8}
Out:
out0.rows = {0}
out0.height = 12
out1.rows = {7, 5}
out2.height = 12
out0.rows = {}
out0.height = 4

out1.rows = {5, 7}
out2.height = 8

)DOC");
}
Expand All @@ -61,11 +59,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel {

std::vector<int> height_sections =
ctx->Attrs().Get<std::vector<int>>("height_sections");
std::vector<int> rows_sections =
ctx->Attrs().Get<std::vector<int>>("rows_sections");
PADDLE_ENFORCE_EQ(
rows_sections.size(), ctx->Outputs("Out").size(),
"The size of rows section should be the same with Outputs size.");
int64_t n = ctx->Outputs("Out").size();

std::vector<framework::DDim> outs_dims;
Expand Down
68 changes: 49 additions & 19 deletions paddle/operators/split_selected_rows_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,70 @@ limitations under the License. */

#include <vector>
#include "paddle/framework/op_registry.h"
#include "paddle/operators/math/selected_rows_functor.h"

namespace paddle {
namespace operators {

static int FindOutIdx(int row, const std::vector<int>& height_sections) {
int offset = 0;
for (size_t i = 0; i < height_sections.size(); ++i) {
if (row >= offset && row < (offset + height_sections[i])) {
return i;
}
offset += height_sections[i];
}
return -1;
}

template <typename DeviceContext, typename T>
class SplitSelectedRowsOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto* x = ctx.Input<framework::SelectedRows>("X");
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");

auto rows_sections = ctx.Attr<std::vector<int>>("rows_sections");
auto height_sections = ctx.Attr<std::vector<int>>("height_sections");

int64_t n = outs.size();
int offset = 0;
auto x_rows = x->rows();
std::vector<std::vector<int>> outs_rows_idx;
outs_rows_idx.resize(outs.size());

for (int64_t i = 0; i < n; ++i) {
framework::Vector<int64_t> out_rows;
for (int64_t j = 0; j < rows_sections[i]; ++j) {
out_rows.push_back(x->rows()[offset + j]);
}
auto row_numel = x->value().numel() / x->value().dims()[0];
auto src = x->value().data<T>();

for (size_t i = 0; i < x_rows.size(); ++i) {
int out_idx = FindOutIdx(x_rows[i], height_sections);
outs_rows_idx[out_idx].push_back(i);
}
auto place = ctx.GetPlace();

auto& out = outs[i];
auto x_dims = x->GetCompleteDims();
x_dims[0] = rows_sections[i];
out->mutable_value()->mutable_data<T>(x_dims, ctx.GetPlace());
framework::Copy(x->value().Slice(offset, rows_sections[i] + offset),
x->place(), ctx.device_context(), out->mutable_value());
outs[i]->set_rows(out_rows);
if (height_sections.size()) {
outs[i]->set_height(height_sections[i]);
for (size_t i = 0; i < outs_rows_idx.size(); ++i) {
auto rows_idx = outs_rows_idx[i];
if (rows_idx.size() > 0) {
auto dims = x->GetCompleteDims();
dims[0] = rows_idx.size();
outs[i]->mutable_value()->mutable_data<T>(dims, x->place());
for (auto idx : rows_idx) {
outs[i]->mutable_rows()->push_back(x_rows[idx]);
}
auto dst = outs[i]->mutable_value()->mutable_data<T>(ctx.GetPlace());
for (size_t j = 0; j < rows_idx.size(); j++) {
if (platform::is_cpu_place(place)) {
memory::Copy(platform::CPUPlace(), dst + j * row_numel,
platform::CPUPlace(), src + rows_idx[j] * row_numel,
sizeof(T) * row_numel);
} else {
#ifdef PADDLE_WITH_CUDA
auto stream = ctx.cuda_device_context().stream();
memory::Copy(platform::CUDAPlace(), dst + j * row_numel,
platform::CUDAPlace(), src + rows_idx[j] * row_numel,
sizeof(T) * row_numel, stream);
#else
PADDLE_THROW("Paddle is not compiled with GPU");
#endif
}
}
}
offset += rows_sections[i];
}
}
};
Expand Down
32 changes: 23 additions & 9 deletions python/paddle/v2/fluid/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from layer_helper import LayerHelper
from distributed_spliter import *
import math
from . import core


class VarBlock:
Expand Down Expand Up @@ -216,15 +217,28 @@ def _append_split_op(self, program, gradblocks):
if len(splited_vars) <= 1:
continue
orig_var = program.global_block().vars[varname]
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly
)
if orig_var == core.VarDesc.VarType.SELECTED_ROWS:
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
program.global_block().append_op(
type="split_selected_rows",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"height_sections": height_sections})
elif orig_var == core.VarDesc.VarType.LOD_TENSOR:
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={"sections": sections} # assume split evenly
)
else:
AssertionError("Variable type should be in set "
"[LOD_TENSOR, SELECTED_ROWS]")
return var_mapping

def get_trainer_program(self):
Expand Down
41 changes: 21 additions & 20 deletions python/paddle/v2/fluid/tests/test_split_selected_rows_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def test_check_grad(self):

def check_with_place(self, place):
scope = core.Scope()
rows = [0, 5, 7, 4]
height = 10
rows = [0, 5, 7, 4, 20]
height = 20
row_numel = 2

# initialize input variable X
Expand All @@ -45,47 +45,49 @@ def check_with_place(self, place):
np_array = np.ones((len(rows), row_numel)).astype("float32")
np_array[0, 0] = 2.0
np_array[2, 1] = 4.0
np_array[4, 1] = 8.0
x_tensor = x.get_tensor()
x_tensor.set(np_array, place)

rows_sections = [2, 2]
height_sections = []
height_sections = [5, 5, 5, 5, 3]

# initialize output variables [out0, out1]
out0 = scope.var('out0').get_selected_rows()
out1 = scope.var('out1').get_selected_rows()
outs_name = ["out%d" % i for i in xrange(len(height_sections))]
outs = [
scope.var(var_name).get_selected_rows() for var_name in outs_name
]

# expected output selected rows
expected_out0_rows = [0, 5]
expected_out1_rows = [7, 4]
expected_height = height
expected_out0_rows = [0, 4]
expected_out1_rows = [5, 7]
expected_out4_rows = [20]

op = Operator(
"split_selected_rows",
X="X",
Out=["out0", "out1"],
rows_sections=rows_sections,
Out=outs_name,
height_sections=height_sections)

op.run(scope, place)

self.assertEqual(out0.rows(), expected_out0_rows)
self.assertEqual(out1.rows(), expected_out1_rows)
self.assertEqual(outs[0].rows(), expected_out0_rows)
self.assertEqual(outs[1].rows(), expected_out1_rows)
self.assertEqual(outs[4].rows(), expected_out4_rows)

self.assertEqual(out0.height(), expected_height)
self.assertEqual(out1.height(), expected_height)
self.assertEqual(outs[0].height(), height_sections[0])
self.assertEqual(outs[4].height(), height_sections[4])

self.assertAlmostEqual(2.0, np.array(out0.get_tensor())[0, 0])
self.assertAlmostEqual(4.0, np.array(out1.get_tensor())[0, 1])
self.assertAlmostEqual(2.0, np.array(outs[0].get_tensor())[0, 0])
self.assertAlmostEqual(4.0, np.array(outs[1].get_tensor())[1, 1])
self.assertAlmostEqual(8.0, np.array(outs[4].get_tensor())[0, 1])

def check_grad_with_place(self, place):
scope = core.Scope()
height = 10
row_numel = 2

# attr
rows_sections = [2, 2]
height_sections = []
height_sections = [5, 5]

# initialize input variable X
out0_grad = scope.var("out0@GRAD").get_selected_rows()
Expand All @@ -112,7 +114,6 @@ def check_grad_with_place(self, place):
"sum",
X=["out0@GRAD", "out1@GRAD"],
Out="X@GRAD",
rows_sections=rows_sections,
height_sections=height_sections)

grad_op.run(scope, place)
Expand Down