1+ /**
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+ package org .apache .hadoop .hdfs .server .federation .router ;
19+
20+ import org .apache .hadoop .hdfs .protocol .AddErasureCodingPolicyResponse ;
21+ import org .apache .hadoop .hdfs .protocol .ECBlockGroupStats ;
22+ import org .apache .hadoop .hdfs .protocol .ECTopologyVerifierResult ;
23+ import org .apache .hadoop .hdfs .protocol .ErasureCodingPolicy ;
24+ import org .apache .hadoop .hdfs .protocol .ErasureCodingPolicyInfo ;
25+ import org .apache .hadoop .hdfs .server .federation .resolver .ActiveNamenodeResolver ;
26+ import org .apache .hadoop .hdfs .server .federation .resolver .FederationNamespaceInfo ;
27+ import org .apache .hadoop .hdfs .server .federation .resolver .RemoteLocation ;
28+ import org .apache .hadoop .hdfs .server .federation .router .async .ApplyFunction ;
29+ import org .apache .hadoop .hdfs .server .namenode .NameNode ;
30+
31+ import java .io .IOException ;
32+ import java .util .Collection ;
33+ import java .util .HashMap ;
34+ import java .util .List ;
35+ import java .util .Map ;
36+ import java .util .Set ;
37+
38+ import static org .apache .hadoop .hdfs .server .federation .router .RouterRpcServer .merge ;
39+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncApply ;
40+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .asyncReturn ;
41+
42+ public class AsyncErasureCoding extends ErasureCoding {
43+ /** RPC server to receive client calls. */
44+ private final RouterRpcServer rpcServer ;
45+ /** RPC clients to connect to the Namenodes. */
46+ private final RouterRpcClient rpcClient ;
47+ /** Interface to identify the active NN for a nameservice or blockpool ID. */
48+ private final ActiveNamenodeResolver namenodeResolver ;
49+
50+ public AsyncErasureCoding (RouterRpcServer server ) {
51+ super (server );
52+ this .rpcServer = server ;
53+ this .rpcClient = this .rpcServer .getRPCClient ();
54+ this .namenodeResolver = this .rpcClient .getNamenodeResolver ();
55+ }
56+
57+ public ErasureCodingPolicyInfo [] getErasureCodingPolicies ()
58+ throws IOException {
59+ rpcServer .checkOperation (NameNode .OperationCategory .READ );
60+
61+ RemoteMethod method = new RemoteMethod ("getErasureCodingPolicies" );
62+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
63+
64+ // Map<FederationNamespaceInfo, ErasureCodingPolicyInfo[]> ret =
65+ rpcClient .invokeConcurrent (
66+ nss , method , true , false , ErasureCodingPolicyInfo [].class );
67+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , ErasureCodingPolicyInfo []>, ErasureCodingPolicyInfo []>) ret -> {
68+ return merge (ret , ErasureCodingPolicyInfo .class );
69+ });
70+
71+ return asyncReturn (ErasureCodingPolicyInfo [].class );
72+ }
73+
74+ @ Override
75+ public Map getErasureCodingCodecs () throws IOException {
76+ rpcServer .checkOperation (NameNode .OperationCategory .READ );
77+
78+ RemoteMethod method = new RemoteMethod ("getErasureCodingCodecs" );
79+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
80+
81+ rpcClient .invokeConcurrent (
82+ nss , method , true , false , Map .class );
83+
84+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , Map >, Map >) retCodecs -> {
85+ Map <String , String > ret = new HashMap <>();
86+ Object obj = retCodecs ;
87+ @ SuppressWarnings ("unchecked" )
88+ Map <FederationNamespaceInfo , Map <String , String >> results =
89+ (Map <FederationNamespaceInfo , Map <String , String >>)obj ;
90+ Collection <Map <String , String >> allCodecs = results .values ();
91+ for (Map <String , String > codecs : allCodecs ) {
92+ ret .putAll (codecs );
93+ }
94+ return ret ;
95+ });
96+
97+ return asyncReturn (Map .class );
98+ }
99+
100+ @ Override
101+ public AddErasureCodingPolicyResponse [] addErasureCodingPolicies (
102+ ErasureCodingPolicy [] policies ) throws IOException {
103+ rpcServer .checkOperation (NameNode .OperationCategory .WRITE );
104+
105+ RemoteMethod method = new RemoteMethod ("addErasureCodingPolicies" ,
106+ new Class <?>[] {ErasureCodingPolicy [].class }, new Object [] {policies });
107+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
108+
109+ rpcClient .invokeConcurrent (
110+ nss , method , true , false , AddErasureCodingPolicyResponse [].class );
111+
112+ asyncApply (
113+ (ApplyFunction
114+ <Map <FederationNamespaceInfo , AddErasureCodingPolicyResponse []>,
115+ AddErasureCodingPolicyResponse []>) ret -> {
116+ return merge (ret , AddErasureCodingPolicyResponse .class );
117+ });
118+ return asyncReturn (AddErasureCodingPolicyResponse [].class );
119+ }
120+
121+ @ Override
122+ public ErasureCodingPolicy getErasureCodingPolicy (String src )
123+ throws IOException {
124+ rpcServer .checkOperation (NameNode .OperationCategory .READ );
125+
126+ final List <RemoteLocation > locations =
127+ rpcServer .getLocationsForPath (src , false , false );
128+ RemoteMethod remoteMethod = new RemoteMethod ("getErasureCodingPolicy" ,
129+ new Class <?>[] {String .class }, new RemoteParam ());
130+ rpcClient .invokeSequential (
131+ locations , remoteMethod , null , null );
132+
133+ asyncApply (ret -> {
134+ return (ErasureCodingPolicy ) ret ;
135+ });
136+
137+ return asyncReturn (ErasureCodingPolicy .class );
138+ }
139+
140+
141+ @ Override
142+ public ECTopologyVerifierResult getECTopologyResultForPolicies (
143+ String [] policyNames ) throws IOException {
144+ RemoteMethod method = new RemoteMethod ("getECTopologyResultForPolicies" ,
145+ new Class <?>[] {String [].class }, new Object [] {policyNames });
146+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
147+ if (nss .isEmpty ()) {
148+ throw new IOException ("No namespace availaible." );
149+ }
150+
151+ // Map<FederationNamespaceInfo, ECTopologyVerifierResult> ret
152+ rpcClient .invokeConcurrent (nss , method , true , false ,
153+ ECTopologyVerifierResult .class );
154+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , ECTopologyVerifierResult >, ECTopologyVerifierResult >) ret -> {
155+ for (Map .Entry <FederationNamespaceInfo , ECTopologyVerifierResult > entry : ret
156+ .entrySet ()) {
157+ if (!entry .getValue ().isSupported ()) {
158+ return entry .getValue ();
159+ }
160+ }
161+ // If no negative result, return the result from the first namespace.
162+ return ret .get (nss .iterator ().next ());
163+ });
164+ return asyncReturn (ECTopologyVerifierResult .class );
165+ }
166+
167+ @ Override
168+ public ECBlockGroupStats getECBlockGroupStats () throws IOException {
169+ rpcServer .checkOperation (NameNode .OperationCategory .READ );
170+
171+ RemoteMethod method = new RemoteMethod ("getECBlockGroupStats" );
172+ Set <FederationNamespaceInfo > nss = namenodeResolver .getNamespaces ();
173+ rpcClient .invokeConcurrent (
174+ nss , method , true , false , ECBlockGroupStats .class );
175+
176+ asyncApply ((ApplyFunction <Map <FederationNamespaceInfo , ECBlockGroupStats >, ECBlockGroupStats >) allStats -> {
177+ return ECBlockGroupStats .merge (allStats .values ());
178+ });
179+ return asyncReturn (ECBlockGroupStats .class );
180+ }
181+ }
0 commit comments