|
30 | 30 | import com.fasterxml.jackson.databind.JsonNode; |
31 | 31 | import com.fasterxml.jackson.databind.ObjectMapper; |
32 | 32 | import com.fasterxml.jackson.databind.node.ObjectNode; |
33 | | -import org.apache.log4j.LogManager; |
34 | | -import org.apache.log4j.Logger; |
35 | 33 | import org.apache.spark.api.java.JavaRDD; |
36 | 34 |
|
37 | 35 | import java.util.Locale; |
|
50 | 48 | */ |
51 | 49 | public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { |
52 | 50 |
|
53 | | - private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class); |
54 | | - |
55 | 51 | private static final ObjectMapper MAPPER = new ObjectMapper(); |
56 | 52 |
|
| 53 | + private final String databaseRegex; |
| 54 | + private final String tableRegex; |
| 55 | + |
57 | 56 | public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) { |
58 | 57 | super(props); |
| 58 | + databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key(), null); |
| 59 | + tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key()); |
59 | 60 | } |
60 | 61 |
|
61 | 62 | // ------------------------------------------------------------------------ |
@@ -112,9 +113,6 @@ public JavaRDD<String> process(JavaRDD<String> maxwellJsonRecords) { |
112 | 113 |
|
113 | 114 | // filter out target databases and tables |
114 | 115 | if (isTargetTable(database, table)) { |
115 | | - |
116 | | - LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table)); |
117 | | - |
118 | 116 | ObjectNode result = (ObjectNode) inputJson.get(DATA); |
119 | 117 | String type = inputJson.get(OPERATION_TYPE).textValue(); |
120 | 118 |
|
@@ -183,9 +181,6 @@ private String processDelete(JsonNode inputJson, ObjectNode result) { |
183 | 181 | * @param table table the data belong to |
184 | 182 | */ |
185 | 183 | private boolean isTargetTable(String database, String table) { |
186 | | - String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key(), null); |
187 | | - String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key()); |
188 | | - |
189 | 184 | if (StringUtils.isNullOrEmpty(databaseRegex)) { |
190 | 185 | return Pattern.matches(tableRegex, table); |
191 | 186 | } else { |
|
0 commit comments