Skip to content

Commit f3e1bd5

Browse files
committed
HADOOP-18257 audit log parser
* tool is invoked through hadoop s3guard command * which can now also be invoked as "hadoop s3a"! * tests are improved. * OperationDuration implements DurationTracker for bit more completeness TODO * split out the record parsing into a hadoop MR record read/write * so support large scale parsing * cli tool just glues that together either for small parallelised extraction or for aggregation to one file. * but a bulk job would work with a larger dataset Change-Id: I25e333592d1058b460b0bfda5313a20de13c2e35
1 parent d8a9eea commit f3e1bd5

8 files changed

Lines changed: 173 additions & 129 deletions

File tree

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DurationInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public String toString() {
9393

9494
@Override
9595
public void close() {
96-
finished();
96+
super.close();
9797
if (logAtInfo) {
9898
log.info("{}", this);
9999
} else {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/OperationDuration.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222

2323
import org.apache.hadoop.classification.InterfaceAudience;
2424
import org.apache.hadoop.classification.InterfaceStability;
25+
import org.apache.hadoop.fs.statistics.DurationTracker;
2526

2627
/**
2728
* Little duration counter.
2829
*/
2930
@InterfaceAudience.Public
3031
@InterfaceStability.Unstable
31-
public class OperationDuration {
32+
public class OperationDuration implements DurationTracker {
3233

3334
/**
3435
* Time in millis when the operation started.
@@ -65,6 +66,16 @@ public void finished() {
6566
finished = time();
6667
}
6768

69+
@Override
70+
public void failed() {
71+
finished();
72+
}
73+
74+
@Override
75+
public void close() {
76+
finished();
77+
}
78+
6879
/**
6980
* Return the duration as {@link #humanTime(long)}.
7081
* @return a printable duration.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java

Lines changed: 20 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.Closeable;
2222
import java.io.IOException;
23+
import java.io.PrintStream;
2324
import java.io.PrintWriter;
2425
import java.net.URI;
2526
import java.util.Arrays;
@@ -34,6 +35,7 @@
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.hadoop.fs.Path;
3637
import org.apache.hadoop.fs.s3a.audit.mapreduce.S3AAuditLogMergerAndParser;
38+
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
3739
import org.apache.hadoop.util.ExitUtil;
3840
import org.apache.hadoop.util.Tool;
3941
import org.apache.hadoop.util.ToolRunner;
@@ -47,10 +49,15 @@
4749
* Its functionality is to parse the audit log files
4850
* and generate avro file.
4951
*/
50-
public class AuditTool extends Configured implements Tool, Closeable {
52+
public class AuditTool extends S3GuardTool {
5153

5254
private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
5355

56+
/**
57+
* Name of audit tool: {@value}.
58+
*/
59+
public static final String AUDIT = "audit";
60+
5461
private final S3AAuditLogMergerAndParser s3AAuditLogMergerAndParser =
5562
new S3AAuditLogMergerAndParser();
5663

@@ -82,8 +89,8 @@ public class AuditTool extends Configured implements Tool, Closeable {
8289

8390
private PrintWriter out;
8491

85-
public AuditTool() {
86-
super();
92+
public AuditTool(final Configuration conf) {
93+
super(conf);
8794
}
8895

8996
/**
@@ -101,15 +108,19 @@ public String getName() {
101108

102109
/**
103110
* This run method in AuditTool takes source and destination path of bucket,
104-
* and check if there are directories and pass these paths to merge and
111+
* and checks if there are directories and pass these paths to merge and
105112
* parse audit log files.
106113
*
107114
* @param args argument list
108115
* @return SUCCESS i.e, '0', which is an exit code
109116
* @throws Exception on any failure.
110117
*/
111118
@Override
112-
public int run(String[] args) throws Exception {
119+
public int run(final String[] args, final PrintStream stream)
120+
throws Exception, ExitUtil.ExitException {
121+
122+
this.out = new PrintWriter(stream);
123+
113124
preConditionArgsSizeCheck(args);
114125
List<String> paths = Arrays.asList(args);
115126

@@ -120,17 +131,7 @@ public int run(String[] args) throws Exception {
120131
Path destPath = new Path(paths.get(1));
121132

122133
// Setting the file system
123-
URI fsURI = new URI(logsPath.toString());
124-
FileSystem fileSystem = FileSystem.get(fsURI, new Configuration());
125-
126-
FileStatus logsFileStatus = fileSystem.getFileStatus(logsPath);
127-
if (logsFileStatus.isFile()) {
128-
errorln("Expecting a directory, but " + logsPath.getName() + " is a"
129-
+ " file which was passed as an argument");
130-
throw invalidArgs(
131-
"Expecting a directory, but " + logsPath.getName() + " is a"
132-
+ " file which was passed as an argument");
133-
}
134+
FileSystem fileSystem = logsPath.getFileSystem(getConf());
134135

135136
// Calls S3AAuditLogMergerAndParser for implementing merging, passing of
136137
// audit log files and converting into avro file
@@ -151,40 +152,9 @@ private void preConditionArgsSizeCheck(String[] args) {
151152
}
152153
}
153154

154-
protected static void errorln(String x) {
155-
System.err.println(x);
156-
}
157-
158-
/**
159-
* Build the exception to raise on invalid arguments.
160-
*
161-
* @param format string format
162-
* @param args optional arguments for the string
163-
* @return a new exception to throw
164-
*/
165-
protected static ExitUtil.ExitException invalidArgs(
166-
String format, Object... args) {
167-
return exitException(INVALID_ARGUMENT, format, args);
168-
}
169-
170-
/**
171-
* Build a exception to throw with a formatted message.
172-
*
173-
* @param exitCode exit code to use
174-
* @param format string format
175-
* @param args optional arguments for the string
176-
* @return a new exception to throw
177-
*/
178-
protected static ExitUtil.ExitException exitException(
179-
final int exitCode,
180-
final String format,
181-
final Object... args) {
182-
return new ExitUtil.ExitException(exitCode,
183-
String.format(format, args));
184-
}
185155

186156
/**
187-
* Flush all active output channels, including {@Code System.err},
157+
* Flush all active output channels, including {@code System.err},
188158
* so as to stay in sync with any JRE log messages.
189159
*/
190160
private void flush() {
@@ -196,44 +166,12 @@ private void flush() {
196166
System.err.flush();
197167
}
198168

199-
@Override
200-
public void close() throws IOException {
169+
170+
public void closeOutput() throws IOException {
201171
flush();
202172
if (out != null) {
203173
out.close();
204174
}
205175
}
206176

207-
/**
208-
* Inner entry point, with no logging or system exits.
209-
*
210-
* @param conf configuration
211-
* @param argv argument list
212-
* @return an exception
213-
* @throws Exception Exception.
214-
*/
215-
public static int exec(Configuration conf, String... argv) throws Exception {
216-
try (AuditTool auditTool = new AuditTool()) {
217-
return ToolRunner.run(conf, auditTool, argv);
218-
}
219-
}
220-
221-
/**
222-
* Main entry point.
223-
*
224-
* @param argv args list
225-
*/
226-
public static void main(String[] argv) {
227-
try {
228-
ExitUtil.terminate(exec(new Configuration(), argv));
229-
} catch (ExitUtil.ExitException e) {
230-
LOG.error("Exception while Terminating the command ran :{}",
231-
e.toString());
232-
System.exit(e.status);
233-
} catch (Exception e) {
234-
LOG.error("Exception while Terminating the command ran :{}",
235-
e.toString(), e);
236-
ExitUtil.halt(-1, e);
237-
}
238-
}
239177
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@
4343
import org.apache.hadoop.io.LongWritable;
4444
import org.apache.hadoop.io.Text;
4545
import org.apache.hadoop.mapred.LineRecordReader;
46+
import org.apache.hadoop.util.DurationInfo;
4647

48+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
49+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
4750
import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
4851
import static org.apache.hadoop.fs.s3a.audit.S3LogParser.BYTESSENT_GROUP;
4952
import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
@@ -65,9 +68,30 @@ public class S3AAuditLogMergerAndParser {
6568
private static final String REFERRER_HEADER_KEY = "referrer";
6669

6770
// Basic parsing counters.
71+
72+
/**
73+
* Number of audit log files parsed.
74+
*/
75+
private long logFilesParsed = 0;
76+
77+
/**
78+
* Number of log entries parsed.
79+
*/
6880
private long auditLogsParsed = 0;
81+
82+
83+
84+
85+
/**
86+
* How many referrer headers were parsed.
87+
*/
6988
private long referrerHeaderLogParsed = 0;
7089

90+
/**
91+
* How many records were skipped due to lack of referrer or other reason.
92+
*/
93+
private long recordsSkipped = 0;
94+
7195
/**
7296
* parseAuditLog method helps in parsing the audit log
7397
* into key-value pairs using regular expressions.
@@ -78,7 +102,7 @@ public class S3AAuditLogMergerAndParser {
78102
public HashMap<String, String> parseAuditLog(String singleAuditLog) {
79103
HashMap<String, String> auditLogMap = new HashMap<>();
80104
if (singleAuditLog == null || singleAuditLog.isEmpty()) {
81-
LOG.info("This is an empty string or null string, expected a valid string to parse");
105+
LOG.debug("This is an empty string or null string, expected a valid string to parse");
82106
return auditLogMap;
83107
}
84108
final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
@@ -107,11 +131,10 @@ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
107131
* audit log.
108132
* @return returns a map which contains key-value pairs of referrer headers
109133
*/
110-
public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
134+
public static HashMap<String, String> parseReferrerHeader(String referrerHeader) {
111135
HashMap<String, String> referrerHeaderMap = new HashMap<>();
112136
if (referrerHeader == null || referrerHeader.isEmpty()) {
113-
LOG.info(
114-
"This is an empty string or null string, expected a valid string to parse");
137+
LOG.debug("This is an empty string or null string, expected a valid string to parse");
115138
return referrerHeaderMap;
116139
}
117140

@@ -163,23 +186,26 @@ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
163186
Path logsPath,
164187
Path destPath) throws IOException {
165188

166-
// Listing file in given path
189+
// List source log files
167190
RemoteIterator<LocatedFileStatus> listOfLogFiles =
168191
fileSystem.listFiles(logsPath, true);
169192

170-
Path destFile = destPath;
193+
Path destFile = new Path(destPath, "AuditLogFile.avro");
171194

172195
try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
173196

174197
// Iterating over the list of files to merge and parse
175198
while (listOfLogFiles.hasNext()) {
199+
logFilesParsed++;
176200
FileStatus fileStatus = listOfLogFiles.next();
177201
int fileLength = (int) fileStatus.getLen();
178202
byte[] byteBuffer = new byte[fileLength];
179203

180-
try (FSDataInputStream fsDataInputStream =
204+
try (DurationInfo duration = new DurationInfo(LOG, "Processing %s", fileStatus.getPath());
205+
FSDataInputStream fsDataInputStream =
181206
awaitFuture(fileSystem.openFile(fileStatus.getPath())
182207
.withFileStatus(fileStatus)
208+
.opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
183209
.build())) {
184210

185211
// Instantiating generated AvroDataRecord class
@@ -226,6 +252,7 @@ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
226252
if (StringUtils.isBlank(referrerHeader) || referrerHeader
227253
.equals("-")) {
228254
LOG.debug("Invalid referrer header for this audit log...");
255+
recordsSkipped++;
229256
continue;
230257
}
231258

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.hadoop.fs.s3a.Constants;
5252
import org.apache.hadoop.fs.s3a.S3AFileSystem;
5353
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
54+
import org.apache.hadoop.fs.s3a.audit.AuditTool;
5455
import org.apache.hadoop.fs.s3a.auth.RolePolicies;
5556
import org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens;
5657
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -992,6 +993,9 @@ public static int run(Configuration conf, String... args) throws
992993
}
993994
switch (subCommand) {
994995

996+
case AuditTool.AUDIT:
997+
command = new AuditTool(conf);
998+
break;
995999
case BucketInfo.NAME:
9961000
command = new BucketInfo(conf);
9971001
break;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#!/usr/bin/env bash
2+
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
if ! declare -f hadoop_subcommand_s3a >/dev/null 2>/dev/null; then
19+
20+
if [[ "${HADOOP_SHELL_EXECNAME}" = hadoop ]]; then
21+
hadoop_add_subcommand "s3a" client "S3 Commands"
22+
fi
23+
24+
# this can't be indented otherwise shelldocs won't get it
25+
26+
## @description s3a command for hadoop
27+
## @audience public
28+
## @stability stable
29+
## @replaceable yes
30+
function hadoop_subcommand_s3a
31+
{
32+
# shellcheck disable=SC2034
33+
HADOOP_CLASSNAME=org.apache.hadoop.fs.s3a.s3guard.S3GuardTool
34+
hadoop_add_to_classpath_tools hadoop-aws
35+
}
36+
37+
fi

0 commit comments

Comments
 (0)