|
| 1 | +""" |
| 2 | +JSON data anonymization module. |
| 3 | +Allows mixing sensitive data while keeping the structure. |
| 4 | +""" |
| 5 | + |
| 6 | +import json |
| 7 | +import random |
| 8 | +from typing import Dict, Any, List, Optional, Union |
| 9 | +from faker import Faker |
| 10 | +import re |
| 11 | + |
| 12 | + |
| 13 | +class DataAnonymizer: |
| 14 | + """JSON data anonymizer.""" |
| 15 | + |
| 16 | + def __init__(self, locale: str = 'en_US'): |
| 17 | + """ |
| 18 | + Initialize the anonymizer with a specific locale. |
| 19 | + |
| 20 | + Args: |
| 21 | + locale: Locale for generation (default English) |
| 22 | + """ |
| 23 | + self.fake = Faker(locale) |
| 24 | + |
| 25 | + # Anonymized data pools |
| 26 | + self.pools = { |
| 27 | + 'firstNames': [self.fake.first_name() for _ in range(100)], |
| 28 | + 'lastNames': [self.fake.last_name() for _ in range(100)], |
| 29 | + 'emails': [self.fake.email() for _ in range(100)], |
| 30 | + 'phones': [self.fake.phone_number() for _ in range(100)], |
| 31 | + 'addresses': [self.fake.address() for _ in range(100)], |
| 32 | + 'streets': [self.fake.street_address() for _ in range(100)], |
| 33 | + 'cities': [self.fake.city() for _ in range(100)], |
| 34 | + 'postcodes': [self.fake.postcode() for _ in range(100)], |
| 35 | + 'countries': [self.fake.country() for _ in range(100)], |
| 36 | + 'companies': [self.fake.company() for _ in range(100)], |
| 37 | + 'urls': [self.fake.url() for _ in range(100)], |
| 38 | + 'sentences': [self.fake.sentence() for _ in range(100)], |
| 39 | + 'paragraphs': [self.fake.paragraph() for _ in range(100)], |
| 40 | + 'dates': [self.fake.date_between(start_date='-30y', end_date='today').isoformat() for _ in range(100)], |
| 41 | + 'datetimes': [self.fake.date_time_between(start_date='-30y', end_date='now').isoformat() for _ in range(100)] |
| 42 | + } |
| 43 | + |
| 44 | + # Define patterns to identify sensitive fields |
| 45 | + self.sensitive_patterns = { |
| 46 | + 'firstName': ['prenom', 'firstname', 'fname', 'given_name', 'first_name'], |
| 47 | + 'lastName': ['nom', 'lastname', 'lname', 'surname', 'last_name', 'family_name'], |
| 48 | + 'email': ['email', 'mail', 'e_mail', 'e-mail', 'adresse_email'], |
| 49 | + 'phone': ['telephone', 'phone', 'tel', 'mobile', 'cellphone', 'numero'], |
| 50 | + 'address': ['adresse', 'address', 'addr'], |
| 51 | + 'street': ['rue', 'street', 'street_address', 'voie'], |
| 52 | + 'city': ['ville', 'city', 'localite'], |
| 53 | + 'postcode': ['code_postal', 'postal_code', 'zip', 'zip_code', 'postcode', 'postalcode'], |
| 54 | + 'country': ['pays', 'country', 'nation'], |
| 55 | + 'company': ['entreprise', 'company', 'societe', 'organization'], |
| 56 | + 'url': ['url', 'website', 'site', 'lien'], |
| 57 | + 'description': ['description', 'commentaire', 'comment', 'note'], |
| 58 | + 'date': ['date', 'created_at', 'updated_at', 'created', 'updated'], |
| 59 | + 'datetime': ['datetime', 'timestamp', 'time'] |
| 60 | + } |
| 61 | + |
| 62 | + def anonymize_json(self, data: Union[Dict, List, str]) -> Union[Dict, List, str]: |
| 63 | + """ |
| 64 | + Anonymize a JSON object by mixing sensitive data. |
| 65 | + |
| 66 | + Args: |
| 67 | + data: JSON data to anonymize (dict, list or JSON string) |
| 68 | + |
| 69 | + Returns: |
| 70 | + Anonymized data |
| 71 | + """ |
| 72 | + # If it's a JSON string, parse it |
| 73 | + if isinstance(data, str): |
| 74 | + try: |
| 75 | + parsed_data = json.loads(data) |
| 76 | + anonymized = self._anonymize_recursive(parsed_data) |
| 77 | + return json.dumps(anonymized, indent=2, ensure_ascii=False) |
| 78 | + except json.JSONDecodeError: |
| 79 | + return data |
| 80 | + |
| 81 | + # Otherwise, process directly |
| 82 | + return self._anonymize_recursive(data) |
| 83 | + |
| 84 | + def _anonymize_recursive(self, data: Any) -> Any: |
| 85 | + """ |
| 86 | + Recursively anonymize a data structure. |
| 87 | + |
| 88 | + Args: |
| 89 | + data: Data to anonymize |
| 90 | + |
| 91 | + Returns: |
| 92 | + Anonymized data |
| 93 | + """ |
| 94 | + if isinstance(data, dict): |
| 95 | + anonymized = {} |
| 96 | + for key, value in data.items(): |
| 97 | + if isinstance(value, str) and value.strip(): |
| 98 | + # Anonymize string values based on field name |
| 99 | + anonymized[key] = self._anonymize_field(key, value) |
| 100 | + else: |
| 101 | + # Recursive processing for objects and lists |
| 102 | + anonymized[key] = self._anonymize_recursive(value) |
| 103 | + return anonymized |
| 104 | + |
| 105 | + elif isinstance(data, list): |
| 106 | + return [self._anonymize_recursive(item) for item in data] |
| 107 | + |
| 108 | + else: |
| 109 | + # Keep other types as is (numbers, booleans, null) |
| 110 | + return data |
| 111 | + |
| 112 | + def _anonymize_field(self, field_name: str, value: str) -> str: |
| 113 | + """ |
| 114 | + Anonymize a field based on its name and value. |
| 115 | + |
| 116 | + Args: |
| 117 | + field_name: Field name |
| 118 | + value: Field value |
| 119 | + |
| 120 | + Returns: |
| 121 | + Anonymized value |
| 122 | + """ |
| 123 | + field_name_lower = field_name.lower() |
| 124 | + |
| 125 | + # First name |
| 126 | + if any(pattern in field_name_lower for pattern in self.sensitive_patterns['firstName']): |
| 127 | + return random.choice(self.pools['firstNames']) |
| 128 | + |
| 129 | + # Last name |
| 130 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['lastName']): |
| 131 | + return random.choice(self.pools['lastNames']) |
| 132 | + |
| 133 | + # Email |
| 134 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['email']): |
| 135 | + return random.choice(self.pools['emails']) |
| 136 | + |
| 137 | + # Phone |
| 138 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['phone']): |
| 139 | + return random.choice(self.pools['phones']) |
| 140 | + |
| 141 | + # Address |
| 142 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['address']): |
| 143 | + return random.choice(self.pools['addresses']) |
| 144 | + |
| 145 | + # Street |
| 146 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['street']): |
| 147 | + return random.choice(self.pools['streets']) |
| 148 | + |
| 149 | + # City |
| 150 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['city']): |
| 151 | + return random.choice(self.pools['cities']) |
| 152 | + |
| 153 | + # Postal code |
| 154 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['postcode']): |
| 155 | + return random.choice(self.pools['postcodes']) |
| 156 | + |
| 157 | + # Country |
| 158 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['country']): |
| 159 | + return random.choice(self.pools['countries']) |
| 160 | + |
| 161 | + # Company |
| 162 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['company']): |
| 163 | + return random.choice(self.pools['companies']) |
| 164 | + |
| 165 | + # URL |
| 166 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['url']): |
| 167 | + return random.choice(self.pools['urls']) |
| 168 | + |
| 169 | + # Description/comment |
| 170 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['description']): |
| 171 | + if len(value) > 100: |
| 172 | + return random.choice(self.pools['paragraphs']) |
| 173 | + else: |
| 174 | + return random.choice(self.pools['sentences']) |
| 175 | + |
| 176 | + # Date |
| 177 | + elif any(pattern in field_name_lower for pattern in self.sensitive_patterns['date']): |
| 178 | + # Try to preserve format |
| 179 | + if 'T' in value or ':' in value: |
| 180 | + return random.choice(self.pools['datetimes']) |
| 181 | + else: |
| 182 | + return random.choice(self.pools['dates']) |
| 183 | + |
| 184 | + # Default: mix with generic data |
| 185 | + return self._anonymize_generic_string(value) |
| 186 | + |
| 187 | + def _anonymize_generic_string(self, value: str) -> str: |
| 188 | + """ |
| 189 | + Anonymize a generic string. |
| 190 | + |
| 191 | + Args: |
| 192 | + value: Value to anonymize |
| 193 | + |
| 194 | + Returns: |
| 195 | + Anonymized value |
| 196 | + """ |
| 197 | + # Preserve approximate length |
| 198 | + if len(value) <= 10: |
| 199 | + return self.fake.word() |
| 200 | + elif len(value) <= 50: |
| 201 | + return self.fake.sentence(nb_words=3) |
| 202 | + else: |
| 203 | + return self.fake.paragraph(nb_sentences=2) |
| 204 | + |
| 205 | + def add_to_pool(self, pool_name: str, values: List[str]): |
| 206 | + """ |
| 207 | + Add values to an anonymization pool. |
| 208 | + |
| 209 | + Args: |
| 210 | + pool_name: Pool name |
| 211 | + values: Values to add |
| 212 | + """ |
| 213 | + if pool_name not in self.pools: |
| 214 | + self.pools[pool_name] = [] |
| 215 | + |
| 216 | + self.pools[pool_name].extend(values) |
| 217 | + |
| 218 | + def get_sensitive_fields(self, data: Union[Dict, List, str]) -> List[str]: |
| 219 | + """ |
| 220 | + Analyze data to identify sensitive fields. |
| 221 | + |
| 222 | + Args: |
| 223 | + data: Data to analyze |
| 224 | + |
| 225 | + Returns: |
| 226 | + List of sensitive field names |
| 227 | + """ |
| 228 | + # If it's a JSON string, parse it |
| 229 | + if isinstance(data, str): |
| 230 | + try: |
| 231 | + data = json.loads(data) |
| 232 | + except json.JSONDecodeError: |
| 233 | + return [] |
| 234 | + |
| 235 | + sensitive_fields = [] |
| 236 | + self._find_sensitive_fields_recursive(data, sensitive_fields) |
| 237 | + return list(set(sensitive_fields)) # Remove duplicates |
| 238 | + |
| 239 | + def _find_sensitive_fields_recursive(self, data: Any, sensitive_fields: List[str], prefix: str = ""): |
| 240 | + """ |
| 241 | + Recursively find sensitive fields in data. |
| 242 | + |
| 243 | + Args: |
| 244 | + data: Data to analyze |
| 245 | + sensitive_fields: List to store sensitive field names |
| 246 | + prefix: Current field path prefix |
| 247 | + """ |
| 248 | + if isinstance(data, dict): |
| 249 | + for key, value in data.items(): |
| 250 | + field_path = f"{prefix}.{key}" if prefix else key |
| 251 | + |
| 252 | + if isinstance(value, str) and value.strip(): |
| 253 | + # Check if this field is sensitive |
| 254 | + if self._is_sensitive_field(key): |
| 255 | + sensitive_fields.append(field_path) |
| 256 | + |
| 257 | + # Recursive processing |
| 258 | + self._find_sensitive_fields_recursive(value, sensitive_fields, field_path) |
| 259 | + |
| 260 | + elif isinstance(data, list): |
| 261 | + for i, item in enumerate(data): |
| 262 | + item_path = f"{prefix}[{i}]" if prefix else f"[{i}]" |
| 263 | + self._find_sensitive_fields_recursive(item, sensitive_fields, item_path) |
| 264 | + |
| 265 | + def _is_sensitive_field(self, field_name: str) -> bool: |
| 266 | + """ |
| 267 | + Check if a field is considered sensitive. |
| 268 | + |
| 269 | + Args: |
| 270 | + field_name: Field name to check |
| 271 | + |
| 272 | + Returns: |
| 273 | + True if the field is sensitive |
| 274 | + """ |
| 275 | + field_name_lower = field_name.lower() |
| 276 | + |
| 277 | + for pattern_list in self.sensitive_patterns.values(): |
| 278 | + if any(pattern in field_name_lower for pattern in pattern_list): |
| 279 | + return True |
| 280 | + |
| 281 | + return False |
| 282 | + |
0 commit comments