1414
1515import unittest
1616import paddle
17+ import paddle .fluid as fluid
18+ import paddle .static as static
19+ import paddle .distributed .fleet as fleet
20+ import paddle .distributed .fleet .base .role_maker as role_maker
1721import os
1822
1923paddle .enable_static ()
@@ -25,26 +29,34 @@ def setUp(self):
2529 os .environ [
2630 "PADDLE_TRAINER_ENDPOINTS" ] = "127.0.0.1:36001,127.0.0.1:36002"
2731
28- def test_pipeline_optimizer (self ):
29- import paddle .distributed .fleet as fleet
30- import paddle .distributed .fleet .base .role_maker as role_maker
31- role = role_maker .PaddleCloudRoleMaker (is_collective = True )
32- fleet .init (role )
33- with paddle .fluid .device_guard ("gpu:0" ):
32+ def net (self ):
33+ with static .device_guard ("gpu:0" ):
3434 input_x = paddle .fluid .layers .data (
3535 name = "x" , shape = [32 ], dtype = 'float32' )
3636 input_y = paddle .fluid .layers .data (
3737 name = "y" , shape = [1 ], dtype = 'int64' )
38+ input_z = paddle .fluid .layers .data (
39+ name = "z" , shape = [1 ], dtype = "float32" )
40+ with static .device_guard ("gpu:all" ):
41+ input_z = input_z * 1.0
42+ input_z .stop_gradient = True
3843 fc_1 = paddle .fluid .layers .fc (input = input_x , size = 64 , act = 'tanh' )
44+ fc_1 = fc_1 * input_z
3945
40- with paddle . fluid .device_guard ("gpu:1" ):
46+ with static .device_guard ("gpu:1" ):
4147 fc_2 = paddle .fluid .layers .fc (input = fc_1 , size = 64 , act = 'tanh' )
48+ fc_2 = fc_2 * input_z
4249 prediction = paddle .fluid .layers .fc (input = [fc_2 ],
4350 size = 2 ,
4451 act = 'softmax' )
4552 cost = paddle .fluid .layers .cross_entropy (
4653 input = prediction , label = input_y )
4754 avg_cost = paddle .fluid .layers .mean (x = cost )
55+ return avg_cost
56+
57+ def test_pipeline_optimizer (self ):
58+ role = role_maker .PaddleCloudRoleMaker (is_collective = True )
59+ fleet .init (role )
4860
4961 strategy = paddle .distributed .fleet .DistributedStrategy ()
5062 strategy .pipeline = True
@@ -53,9 +65,43 @@ def test_pipeline_optimizer(self):
5365 'accumulate_steps' : 2
5466 }
5567
56- optimizer = paddle .fluid .optimizer .Adam (0.01 )
57- optimizer = fleet .distributed_optimizer (optimizer , strategy = strategy )
58- optimizer .minimize (avg_cost )
68+ train_prog , startup_prog = static .Program (), static .Program ()
69+ with static .program_guard (train_prog , startup_prog ):
70+ with fluid .unique_name .guard ():
71+ avg_cost = self .net ()
72+
73+ optimizer = paddle .fluid .optimizer .Adam (0.01 )
74+ optimizer = fleet .distributed_optimizer (
75+ optimizer , strategy = strategy )
76+ optimizer .minimize (avg_cost )
77+
78+ def test_pipeline_amp_optimizer (self ):
79+ """ test pipeline& with device:all """
80+ role = role_maker .PaddleCloudRoleMaker (is_collective = True )
81+ fleet .init (role )
82+
83+ strategy = paddle .distributed .fleet .DistributedStrategy ()
84+ strategy .amp = True
85+ strategy .pipeline = True
86+ strategy .pipeline_configs = {
87+ 'micro_batch_size' : 1 ,
88+ 'accumulate_steps' : 2
89+ }
90+
91+ train_prog , startup_prog = static .Program (), static .Program ()
92+ with static .program_guard (train_prog , startup_prog ):
93+ with fluid .unique_name .guard ():
94+ avg_cost = self .net ()
95+
96+ optimizer = paddle .fluid .optimizer .Adam (0.01 )
97+ optimizer = fleet .distributed_optimizer (
98+ optimizer , strategy = strategy )
99+ optimizer .minimize (avg_cost )
100+
101+ ops = train_prog ._pipeline_opt ['section_program' ].global_block ().ops
102+ ops = [op .type for op in ops ]
103+ self .assertEqual (ops .count ('send_v2' ), 1 )
104+ self .assertEqual (ops .count ('recv_v2' ), 1 )
59105
60106
61107if __name__ == "__main__" :
0 commit comments