diff --git a/test/distribution/test_distribution_transform_static.py b/test/distribution/test_distribution_transform_static.py index 3d128df5acb847..0306127261ed79 100644 --- a/test/distribution/test_distribution_transform_static.py +++ b/test/distribution/test_distribution_transform_static.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing import unittest import numpy as np @@ -1223,5 +1224,295 @@ def test_forward_log_det_jacobian(self): ) +def _np_softplus(x, beta=1.0, threshold=20.0): + if np.any(beta * x > threshold): + return x + return 1.0 / beta * np.log1p(np.exp(beta * x)) + + +class TestSigmoidTransform(unittest.TestCase): + def setUp(self): + self._t = transform.SigmoidTransform() + + def test_is_injective(self): + self.assertTrue(self._t._is_injective()) + + def test_domain(self): + self.assertTrue(isinstance(self._t._domain, variable.Real)) + + def test_codomain(self): + self.assertTrue(isinstance(self._t._codomain, variable.Variable)) + + @param.param_func( + ((np.ones((5, 10)), 1 / (1 + np.exp(-np.ones((5, 10))))),) + ) + @test_with_pir_api + def test_forward(self, input, expected): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.SigmoidTransform() + out = model.forward(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + np.testing.assert_allclose( + result, + expected, + rtol=config.RTOL.get(str(input.dtype)), + atol=config.ATOL.get(str(input.dtype)), + ) + + @param.param_func( + ((np.ones(10), np.log(np.ones(10)) - np.log1p(-np.ones(10))),) + ) + @test_with_pir_api + def test_inverse(self, input, expected): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.SigmoidTransform() + out = model.inverse(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + np.testing.assert_allclose( + result, + expected, + rtol=config.RTOL.get(str(input.dtype)), + atol=config.ATOL.get(str(input.dtype)), + ) + + @param.param_func( + ( + ( + np.ones(10), + -_np_softplus(-np.ones(10)) - _np_softplus(np.ones(10)), + ), + ) + ) + @test_with_pir_api + def test_forward_log_det_jacobian(self, input, expected): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.SigmoidTransform() + out = model.forward_log_det_jacobian(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + np.testing.assert_allclose( + result, + expected, + rtol=config.RTOL.get(str(input.dtype)), + atol=config.ATOL.get(str(input.dtype)), + ) + + @param.param_func([((), ()), ((2, 3, 5), (2, 3, 5))]) + def test_forward_shape(self, shape, expected_shape): + self.assertEqual(self._t.forward_shape(shape), expected_shape) + + @param.param_func([((), ()), ((2, 3, 5), (2, 3, 5))]) + def test_inverse_shape(self, shape, expected_shape): + self.assertEqual(self._t.forward_shape(shape), expected_shape) + + @param.param_func([(np.array(1.0), np.array(1.0))]) + @test_with_pir_api + def test_zerodim(self, input, expected): + shape = () + if paddle.framework.in_pir_mode(): + shape = [] + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, 'float32') + model = transform.SigmoidTransform() + self.assertEqual(model.forward(x).shape, shape) + self.assertEqual(model.inverse(x).shape, shape) + self.assertEqual(model.forward_log_det_jacobian(x).shape, shape) + self.assertEqual(model.inverse_log_det_jacobian(x).shape, shape) + self.assertEqual(model.forward_shape(x.shape), shape) + self.assertEqual(model.inverse_shape(x.shape), shape) + + +class TestStickBreakingTransform(unittest.TestCase): + def setUp(self): + self._t = transform.StickBreakingTransform() + + def test_is_injective(self): + self.assertTrue(self._t._is_injective()) + + def test_domain(self): + self.assertTrue(isinstance(self._t._domain, variable.Independent)) + + def test_codomain(self): + self.assertTrue(isinstance(self._t._codomain, variable.Variable)) + + @param.param_func(((np.random.random(10),),)) + @test_with_pir_api + def test_forward(self, input): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.StickBreakingTransform() + fwd = model.forward(x) + out = model.inverse(fwd) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + np.testing.assert_allclose( + result, + input, + rtol=config.RTOL.get(str(input.dtype)), + atol=config.ATOL.get(str(input.dtype)), + ) + + @param.param_func([((2, 3, 5), (2, 3, 6))]) + def test_forward_shape(self, shape, expected_shape): + self.assertEqual(self._t.forward_shape(shape), expected_shape) + + @param.param_func([((2, 3, 5), (2, 3, 4))]) + def test_inverse_shape(self, shape, expected_shape): + self.assertEqual(self._t.inverse_shape(shape), expected_shape) + + @param.param_func(((np.random.random(10),),)) + @test_with_pir_api + def test_forward_log_det_jacobian(self, input): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.StickBreakingTransform() + out = model.forward_log_det_jacobian(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + self.assertEqual(result.shape, ()) + + +@param.place(config.DEVICES) +@param.param_cls( + (param.TEST_CASE_NAME, 'transforms', 'axis'), + [ + ('simple_one_transform', [transform.ExpTransform()], 0), + ], +) +class TestStackTransform(unittest.TestCase): + def setUp(self): + self._t = transform.StackTransform(self.transforms, self.axis) + + def test_is_injective(self): + self.assertTrue(self._t._is_injective()) + + def test_domain(self): + self.assertTrue(isinstance(self._t._domain, variable.Stack)) + + def test_codomain(self): + self.assertTrue(isinstance(self._t._codomain, variable.Stack)) + + @param.param_func( + [ + (np.array([[0.0, 1.0, 2.0, 3.0]]),), + (np.array([[-5.0, 6.0, 7.0, 8.0]]),), + ] + ) + @test_with_pir_api + def test_forward(self, input): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.StackTransform(self.transforms, self.axis) + out = model.forward(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + self.assertEqual(tuple(result.shape), input.shape) + + @param.param_func( + [ + (np.array([[1.0, 2.0, 3.0]]),), + ( + np.array( + [[6.0, 7.0, 8.0]], + ), + ), + ] + ) + @test_with_pir_api + def test_inverse(self, input): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.StackTransform(self.transforms, self.axis) + out = model.inverse(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + self.assertEqual(tuple(result.shape), input.shape) + + @param.param_func( + [(np.array([[1.0, 2.0, 3.0]]),), (np.array([[6.0, 7.0, 8.0]]),)] + ) + @test_with_pir_api + def test_forward_log_det_jacobian(self, input): + with paddle.static.program_guard(paddle.static.Program()): + x = paddle.static.data('X', input.shape, input.dtype) + model = transform.StackTransform(self.transforms, self.axis) + out = model.forward_log_det_jacobian(x) + place = ( + paddle.CUDAPlace(0) + if paddle.core.is_compiled_with_cuda() + else paddle.CPUPlace() + ) + exe = paddle.static.Executor(place) + (result,) = exe.run(feed={'X': input}, fetch_list=[out]) + self.assertEqual(tuple(result.shape), input.shape) + + @param.param_func([((), ()), ((2, 3, 5), (2, 3, 5))]) + def test_forward_shape(self, shape, expected_shape): + self.assertEqual(self._t.forward_shape(shape), expected_shape) + + @param.param_func([((), ()), ((2, 3, 5), (2, 3, 5))]) + def test_inverse_shape(self, shape, expected_shape): + self.assertEqual(self._t.forward_shape(shape), expected_shape) + + def test_axis(self): + self.assertEqual(self._t.axis, self.axis) + + @param.param_func( + [ + (0, 0, TypeError), + ([0], 0, TypeError), + ([paddle.distribution.ExpTransform()], 'axis', TypeError), + ] + ) + @test_with_pir_api + def test_init_exception(self, transforms, axis, exc): + with self.assertRaises(exc): + paddle.distribution.StackTransform(transforms, axis) + + @test_with_pir_api + def test_transforms(self): + self.assertIsInstance((self._t.transforms), typing.Sequence) + + if __name__ == '__main__': unittest.main()