From c87788f64d0b00890087f91ce6d06426cc5518b7 Mon Sep 17 00:00:00 2001 From: Chris Huang Date: Wed, 11 Dec 2013 15:30:20 +0800 Subject: [PATCH 1/2] provide schema data for PcapLoader --- .../loaders/pcap/packet/PacketLoader.java | 86 ++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketLoader.java b/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketLoader.java index a08c220..8a7dcb7 100644 --- a/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketLoader.java +++ b/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketLoader.java @@ -3,14 +3,70 @@ import com.packetloop.packetpig.loaders.pcap.PcapLoader; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.Expression; +import org.apache.pig.LoadMetadata; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import java.io.IOException; -public class PacketLoader extends PcapLoader { +public class PacketLoader extends PcapLoader implements LoadMetadata { + + static final String[] NAMES = { + "ts", + "ip_version", "ip_header_length", "ip_tos", "ip_total_length", + "ip_id", "ip_flags", "ip_frag_offset", "ip_ttl", "ip_proto", "ip_checksum", + "ip_src", "ip_dst", + "tcp_sport", "tcp_dport", "tcp_seq_id", "tcp_ack_id", + "tcp_offset", "tcp_ns", "tcp_cwr", "tcp_ece", "tcp_urg", "tcp_ack", "tcp_psh", + "tcp_rst", "tcp_syn", "tcp_fin", "tcp_window", "tcp_len", + "udp_sport", "udp_dport", "udp_len", "udp_checksum" + }; + + static final Byte[] TYPES = { + DataType.LONG, // "ts" + DataType.INTEGER, // "ip_version" + DataType.INTEGER, // "ip_header_length" + DataType.INTEGER, // "ip_tos" + DataType.INTEGER, // "ip_total_length" + DataType.INTEGER, // "ip_id" + DataType.INTEGER, // "ip_flags" + DataType.INTEGER, // "ip_frag_offset" + DataType.INTEGER, // "ip_ttl" + DataType.INTEGER, // "ip_proto" + DataType.INTEGER, // "ip_checksum" + DataType.CHARARRAY, // "ip_src" + DataType.CHARARRAY, // "ip_dst" + DataType.INTEGER, // "tcp_sport" + DataType.INTEGER, // "tcp_dport" + DataType.LONG, // "tcp_seq_id" + DataType.LONG, // "tcp_ack_id" + DataType.INTEGER, // "tcp_offset" + DataType.INTEGER, // "tcp_ns" + DataType.INTEGER, // "tcp_cwr" + DataType.INTEGER, // "tcp_ece" + DataType.INTEGER, // "tcp_urg" + DataType.INTEGER, // "tcp_ack" + DataType.INTEGER, // "tcp_psh" + DataType.INTEGER, // "tcp_rst" + DataType.INTEGER, // "tcp_syn" + DataType.INTEGER, // "tcp_fin" + DataType.INTEGER, // "tcp_window" + DataType.INTEGER, // "tcp_len" + + DataType.INTEGER, // "udp_sport" + DataType.INTEGER, // "udp_dport" + DataType.INTEGER, // "udp_len" + DataType.CHARARRAY, // "udp_checksum" + }; + @Override public InputFormat getInputFormat() throws IOException { return new FileInputFormat() { @@ -20,4 +76,32 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemp } }; } + + @Override + public ResourceSchema getSchema(String location, Job job) throws IOException { + ResourceFieldSchema[] fields = new ResourceFieldSchema[NAMES.length]; + for (int i = 0; i < NAMES.length; i++) { + ResourceFieldSchema field = new ResourceFieldSchema(); + field.setName(NAMES[i]); + field.setType(TYPES[i]); + fields[i] = field; + } + ResourceSchema schema = new ResourceSchema(); + schema.setFields(fields); + return schema; + } + + @Override + public String[] getPartitionKeys(String location, Job job) throws IOException { + return null; + } + + @Override + public ResourceStatistics getStatistics(String arg0, Job arg1) throws IOException { + return null; + } + + @Override + public void setPartitionFilter(Expression arg0) throws IOException { + } } From 48b7cd121c0d38f068cbcc64702f9f828e96d725 Mon Sep 17 00:00:00 2001 From: Chris Huang Date: Wed, 11 Dec 2013 15:32:54 +0800 Subject: [PATCH 2/2] provide schema data for PacketNgramRecordReader --- .../pcap/packet/PacketNgramLoader.java | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketNgramLoader.java b/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketNgramLoader.java index 45c4fc3..4d14530 100644 --- a/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketNgramLoader.java +++ b/lib/packetpig/src/main/java/com/packetloop/packetpig/loaders/pcap/packet/PacketNgramLoader.java @@ -3,17 +3,35 @@ import com.packetloop.packetpig.loaders.pcap.PcapLoader; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.Expression; +import org.apache.pig.LoadMetadata; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import java.io.IOException; -public class PacketNgramLoader extends PcapLoader { +public class PacketNgramLoader extends PcapLoader implements LoadMetadata { private int n; private String filter; + static final String[] NAMES = { + "key", "filter", "int_value", "count", + }; + + static final Byte[] TYPES = { + DataType.LONG, // "key" + DataType.CHARARRAY, // "filter" + DataType.INTEGER, // "int_value" + DataType.INTEGER, // "count" + }; + public PacketNgramLoader(String filter, String n) { this.filter = filter; this.n = Integer.parseInt(n); @@ -28,4 +46,32 @@ public RecordReader createRecordReader(InputSplit split, TaskAttemp } }; } + + @Override + public ResourceSchema getSchema(String location, Job job) throws IOException { + ResourceFieldSchema[] fields = new ResourceFieldSchema[NAMES.length]; + for (int i = 0; i < NAMES.length; i++) { + ResourceFieldSchema field = new ResourceFieldSchema(); + field.setName(NAMES[i]); + field.setType(TYPES[i]); + fields[i] = field; + } + ResourceSchema schema = new ResourceSchema(); + schema.setFields(fields); + return schema; + } + + @Override + public String[] getPartitionKeys(String location, Job job) throws IOException { + return null; + } + + @Override + public ResourceStatistics getStatistics(String arg0, Job arg1) throws IOException { + return null; + } + + @Override + public void setPartitionFilter(Expression arg0) throws IOException { + } }