117117import java .util .concurrent .atomic .AtomicBoolean ;
118118
119119import static org .apache .hadoop .hdfs .server .common .HdfsServerConstants .BlockUCState .UNDER_CONSTRUCTION ;
120+ import static org .apache .hadoop .test .MetricsAsserts .getLongCounter ;
120121import static org .apache .hadoop .test .MetricsAsserts .getMetrics ;
121122import static org .junit .Assert .assertEquals ;
122123import static org .junit .Assert .assertFalse ;
@@ -2121,4 +2122,83 @@ public void testBlockReportAfterDataNodeRestart() throws Exception {
21212122 assertEquals (2 , locs [0 ].getHosts ().length );
21222123 }
21232124 }
2125+
2126+ /**
2127+ * Test processing toInvalidate in block reported, if the block not exists need
2128+ * to set the numBytes of the block to NO_ACK,
2129+ * the DataNode processing will not report incremental blocks.
2130+ */
2131+ @ Test (timeout = 360000 )
2132+ public void testBlockReportSetNoAckBlockToInvalidate () throws Exception {
2133+ Configuration conf = new HdfsConfiguration ();
2134+ conf .setInt (DFSConfigKeys .DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY , 500 );
2135+ conf .setInt (DFSConfigKeys .DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY , 10 );
2136+ conf .setLong (DFSConfigKeys .DFS_HEARTBEAT_INTERVAL_KEY , 1L );
2137+ try (MiniDFSCluster cluster =
2138+ new MiniDFSCluster .Builder (conf ).numDataNodes (1 ).build ()) {
2139+ cluster .waitActive ();
2140+ BlockManager blockManager = cluster .getNamesystem ().getBlockManager ();
2141+ DistributedFileSystem fs = cluster .getFileSystem ();
2142+ // Write file.
2143+ Path file = new Path ("/test" );
2144+ DFSTestUtil .createFile (fs , file , 10240L , (short )1 , 0L );
2145+ DFSTestUtil .waitReplication (fs , file , (short ) 1 );
2146+ LocatedBlock lb = DFSTestUtil .getAllBlocks (fs , file ).get (0 );
2147+ DatanodeInfo [] loc = lb .getLocations ();
2148+ assertEquals (1 , loc .length );
2149+ List <DataNode > datanodes = cluster .getDataNodes ();
2150+ assertEquals (1 , datanodes .size ());
2151+ DataNode datanode = datanodes .get (0 );
2152+ assertEquals (datanode .getDatanodeUuid (), loc [0 ].getDatanodeUuid ());
2153+
2154+ MetricsRecordBuilder rb = getMetrics (datanode .getMetrics ().name ());
2155+ // Check the IncrementalBlockReportsNumOps of DataNode, it will be 0.
2156+ assertEquals (1 , getLongCounter ("IncrementalBlockReportsNumOps" , rb ));
2157+
2158+ // Delete file and remove block.
2159+ fs .delete (file , false );
2160+
2161+ // Wait for the processing of the marked deleted block to complete.
2162+ BlockManagerTestUtil .waitForMarkedDeleteQueueIsEmpty (blockManager );
2163+ assertNull (blockManager .getStoredBlock (lb .getBlock ().getLocalBlock ()));
2164+
2165+ // Expire heartbeat on the NameNode,and datanode to be marked dead.
2166+ datanode .setHeartbeatsDisabledForTests (true );
2167+ cluster .setDataNodeDead (datanode .getDatanodeId ());
2168+ assertFalse (blockManager .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2169+
2170+ // Wait for re-registration and heartbeat.
2171+ datanode .setHeartbeatsDisabledForTests (false );
2172+ final DatanodeDescriptor dn1Desc = cluster .getNamesystem (0 )
2173+ .getBlockManager ().getDatanodeManager ()
2174+ .getDatanode (datanode .getDatanodeId ());
2175+ GenericTestUtils .waitFor (
2176+ () -> dn1Desc .isAlive () && dn1Desc .isHeartbeatedSinceRegistration (),
2177+ 100 , 5000 );
2178+
2179+ // Trigger BlockReports and block is not exists,
2180+ // it will add invalidateBlocks and set block numBytes be NO_ACK.
2181+ cluster .triggerBlockReports ();
2182+ GenericTestUtils .waitFor (
2183+ () -> blockManager .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()),
2184+ 100 , 1000 );
2185+
2186+ // Trigger schedule blocks for deletion at datanode.
2187+ int workCount = blockManager .computeInvalidateWork (1 );
2188+ assertEquals (1 , workCount );
2189+ assertFalse (blockManager .containsInvalidateBlock (loc [0 ], lb .getBlock ().getLocalBlock ()));
2190+
2191+ // Wait for the blocksRemoved value in DataNode to be 1.
2192+ GenericTestUtils .waitFor (
2193+ () -> datanode .getMetrics ().getBlocksRemoved () == 1 ,
2194+ 100 , 5000 );
2195+
2196+ // Trigger immediate deletion report at datanode.
2197+ cluster .triggerDeletionReports ();
2198+
2199+ // Delete block numBytes be NO_ACK and will not deletion block report,
2200+ // so check the IncrementalBlockReportsNumOps of DataNode still 1.
2201+ assertEquals (1 , getLongCounter ("IncrementalBlockReportsNumOps" , rb ));
2202+ }
2203+ }
21242204}
0 commit comments