|
52 | 52 | import java.util.List; |
53 | 53 | import java.util.Map; |
54 | 54 | import java.util.Properties; |
| 55 | +import java.util.concurrent.TimeoutException; |
55 | 56 | import java.util.concurrent.atomic.AtomicInteger; |
56 | 57 |
|
57 | 58 | import org.apache.hadoop.conf.Configuration; |
@@ -958,13 +959,12 @@ public void testMoverWithStripedFile() throws Exception { |
958 | 959 | new String[] { "-p", barDir }); |
959 | 960 | Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc); |
960 | 961 |
|
961 | | - // verify storage types and locations |
| 962 | + // Verify storage types and locations. |
| 963 | + // Wait until Namenode confirms ARCHIVE storage type for all blocks of |
| 964 | + // fooFile. |
| 965 | + waitForUpdatedStorageType(client, fooFile, fileLen, StorageType.ARCHIVE); |
| 966 | + |
962 | 967 | locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen); |
963 | | - for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){ |
964 | | - for( StorageType type : lb.getStorageTypes()){ |
965 | | - Assert.assertEquals(StorageType.ARCHIVE, type); |
966 | | - } |
967 | | - } |
968 | 968 | StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, |
969 | 969 | dataBlocks + parityBlocks); |
970 | 970 |
|
@@ -1005,6 +1005,43 @@ public void testMoverWithStripedFile() throws Exception { |
1005 | 1005 | } |
1006 | 1006 | } |
1007 | 1007 |
|
| 1008 | + /** |
| 1009 | + * Wait until Namenode reports expected storage type for all blocks of |
| 1010 | + * given file. |
| 1011 | + * |
| 1012 | + * @param client handle all RPC calls to Namenode. |
| 1013 | + * @param file file for which we are expecting same storage type of all |
| 1014 | + * located blocks. |
| 1015 | + * @param fileLen length of the file. |
| 1016 | + * @param expectedStorageType storage type to expect for all blocks of the |
| 1017 | + * given file. |
| 1018 | + * @throws TimeoutException if the wait timed out. |
| 1019 | + * @throws InterruptedException if interrupted while waiting for the response. |
| 1020 | + */ |
| 1021 | + private void waitForUpdatedStorageType(ClientProtocol client, String file, |
| 1022 | + long fileLen, StorageType expectedStorageType) |
| 1023 | + throws TimeoutException, InterruptedException { |
| 1024 | + GenericTestUtils.waitFor(() -> { |
| 1025 | + LocatedBlocks blocks; |
| 1026 | + try { |
| 1027 | + blocks = client.getBlockLocations(file, 0, fileLen); |
| 1028 | + } catch (IOException e) { |
| 1029 | + throw new RuntimeException(e); |
| 1030 | + } |
| 1031 | + for (LocatedBlock lb : blocks.getLocatedBlocks()) { |
| 1032 | + for (StorageType type : lb.getStorageTypes()) { |
| 1033 | + if (!expectedStorageType.equals(type)) { |
| 1034 | + LOG.info("Block {} has StorageType: {}. It might not have been " |
| 1035 | + + "updated yet, awaiting the latest update.", |
| 1036 | + lb.getBlock().toString(), type); |
| 1037 | + return false; |
| 1038 | + } |
| 1039 | + } |
| 1040 | + } |
| 1041 | + return true; |
| 1042 | + }, 500, 5000, "Blocks storage type must be ARCHIVE"); |
| 1043 | + } |
| 1044 | + |
1008 | 1045 | private void initSecureConf(Configuration conf) throws Exception { |
1009 | 1046 | String username = "mover"; |
1010 | 1047 | File baseDir = GenericTestUtils.getTestDir(TestMover.class.getSimpleName()); |
|
0 commit comments