001/* 002 * Copyright (c) 2015-2020, Oracle and/or its affiliates. All rights reserved. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.tribuo.data.columnar; 018 019import com.oracle.labs.mlrg.olcut.config.Config; 020import com.oracle.labs.mlrg.olcut.config.Configurable; 021import com.oracle.labs.mlrg.olcut.config.PropertyException; 022import com.oracle.labs.mlrg.olcut.provenance.ConfiguredObjectProvenance; 023import com.oracle.labs.mlrg.olcut.provenance.Provenancable; 024import com.oracle.labs.mlrg.olcut.provenance.impl.ConfiguredObjectProvenanceImpl; 025import org.tribuo.Example; 026import org.tribuo.ImmutableFeatureMap; 027import org.tribuo.Model; 028import org.tribuo.Output; 029import org.tribuo.VariableInfo; 030import org.tribuo.impl.ArrayExample; 031 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.Optional; 040import java.util.Set; 041import java.util.function.Function; 042import java.util.logging.Logger; 043import java.util.regex.Pattern; 044import java.util.stream.Collectors; 045 046/** 047 * A processor which takes a Map of String to String and returns an {@link Example}. 048 * <p> 049 * It accepts a {@link ResponseProcessor} which converts the response field into an {@link Output}, 050 * a Map of {@link FieldProcessor}s which converts fields into {@link ColumnarFeature}s, and a Set 051 * of {@link FeatureProcessor}s which processes the list of {@link ColumnarFeature}s before {@link Example} 052 * construction. Optionally metadata and weights can be extracted using {@link FieldExtractor}s 053 * and written into each example as they are constructed. 054 * <p> 055 * If the metadata extractors are invalid (i.e., two extractors write to the same metadata key), 056 * the RowProcessor throws {@link PropertyException}. 057 */ 058public class RowProcessor<T extends Output<T>> implements Configurable, Provenancable<ConfiguredObjectProvenance> { 059 060 private static final Logger logger = Logger.getLogger(RowProcessor.class.getName()); 061 062 private static final String FEATURE_NAME_REGEX = "["+ColumnarFeature.JOINER+FieldProcessor.NAMESPACE+"]"; 063 064 private static final Pattern FEATURE_NAME_PATTERN = Pattern.compile(FEATURE_NAME_REGEX); 065 066 @Config(description="Extractors for the example metadata.") 067 private List<FieldExtractor<?>> metadataExtractors = Collections.emptyList(); 068 069 @Config(description="Extractor for the example weight.") 070 protected FieldExtractor<Float> weightExtractor = null; 071 072 @Config(mandatory = true,description="Processor which extracts the response.") 073 protected ResponseProcessor<T> responseProcessor; 074 075 @Config(mandatory = true,description="The list of field processors to use.") 076 private List<FieldProcessor> fieldProcessorList; 077 078 // fieldProcessorList is unpacked into this map to make the config files less complex. 079 // fieldProcessorMap is the store of record for field processors. 080 protected Map<String,FieldProcessor> fieldProcessorMap; 081 082 @Config(description="A set of feature processors to apply after extraction.") 083 private Set<FeatureProcessor> featureProcessors = new HashSet<>(); 084 085 @Config(description="A map from a regex to field processors to apply to fields matching the regex.") 086 protected Map<String,FieldProcessor> regexMappingProcessors = new HashMap<>(); 087 088 protected boolean configured; 089 090 /** 091 * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable, 092 * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed. 093 * <p> 094 * This processor does not generate any additional metadata for the examples, nor does it set the 095 * weight value on generated examples. 096 * @param responseProcessor The response processor to use. 097 * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields. 098 */ 099 public RowProcessor(ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap) { 100 this(Collections.emptyList(),null,responseProcessor,fieldProcessorMap,Collections.emptySet()); 101 } 102 103 /** 104 * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable, 105 * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed. 106 * <p> 107 * After extraction the features are then processed using the supplied set of feature processors. 108 * These processors can be used to insert conjunction features which are triggered when 109 * multiple features appear, or to filter out unnecessary features. 110 * <p> 111 * This processor does not generate any additional metadata for the examples, nor does it set the 112 * weight value on generated examples. 113 * @param responseProcessor The response processor to use. 114 * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields. 115 * @param featureProcessors The feature processors to run on each extracted feature list. 116 */ 117 public RowProcessor(ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap, Set<FeatureProcessor> featureProcessors) { 118 this(Collections.emptyList(),null,responseProcessor,fieldProcessorMap,featureProcessors); 119 } 120 121 /** 122 * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable, 123 * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed. 124 * <p> 125 * After extraction the features are then processed using the supplied set of feature processors. 126 * These processors can be used to insert conjunction features which are triggered when 127 * multiple features appear, or to filter out unnecessary features. 128 * <p> 129 * Additionally this processor can extract a weight from each row and insert it into the example, along 130 * with more general metadata fields (e.g., the row number, date stamps). The weightExtractor can be null, 131 * and if so the weights are left unset. 132 * @param metadataExtractors The metadata extractors to run per example. If two metadata extractors emit 133 * the same metadata name then the constructor throws a PropertyException. 134 * @param weightExtractor The weight extractor, if null the weights are left unset at their default. 135 * @param responseProcessor The response processor to use. 136 * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields. 137 * @param featureProcessors The feature processors to run on each extracted feature list. 138 */ 139 public RowProcessor(List<FieldExtractor<?>> metadataExtractors, FieldExtractor<Float> weightExtractor, 140 ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap, 141 Set<FeatureProcessor> featureProcessors) { 142 this(metadataExtractors,weightExtractor,responseProcessor,fieldProcessorMap,Collections.emptyMap(),featureProcessors); 143 } 144 145 /** 146 * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable, 147 * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed. 148 * <p> 149 * In addition this processor can instantiate field processors which match the regexes supplied in 150 * the regexMappingProcessors. If a regex matches a field which already has a fieldProcessor assigned to 151 * it, it throws an IllegalArgumentException. 152 * <p> 153 * After extraction the features are then processed using the supplied set of feature processors. 154 * These processors can be used to insert conjunction features which are triggered when 155 * multiple features appear, or to filter out unnecessary features. 156 * <p> 157 * Additionally this processor can extract a weight from each row and insert it into the example, along 158 * with more general metadata fields (e.g., the row number, date stamps). The weightExtractor can be null, 159 * and if so the weights are left unset. 160 * @param metadataExtractors The metadata extractors to run per example. If two metadata extractors emit 161 * the same metadata name then the constructor throws a PropertyException. 162 * @param weightExtractor The weight extractor, if null the weights are left unset at their default. 163 * @param responseProcessor The response processor to use. 164 * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields. 165 * @param regexMappingProcessors A set of field processors which can be instantiated if the regexes match the field names. 166 * @param featureProcessors The feature processors to run on each extracted feature list. 167 */ 168 public RowProcessor(List<FieldExtractor<?>> metadataExtractors, FieldExtractor<Float> weightExtractor, 169 ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap, 170 Map<String,FieldProcessor> regexMappingProcessors, Set<FeatureProcessor> featureProcessors) { 171 this.metadataExtractors = metadataExtractors.isEmpty() ? Collections.emptyList() : new ArrayList<>(metadataExtractors); 172 this.weightExtractor = weightExtractor; 173 this.responseProcessor = responseProcessor; 174 this.fieldProcessorMap = new HashMap<>(fieldProcessorMap); 175 this.regexMappingProcessors = regexMappingProcessors.isEmpty() ? Collections.emptyMap() : new HashMap<>(regexMappingProcessors); 176 this.featureProcessors.addAll(featureProcessors); 177 postConfig(); 178 } 179 180 /** 181 * For olcut. 182 */ 183 protected RowProcessor() {} 184 185 /** 186 * Used by the OLCUT configuration system, and should not be called by external code. 187 */ 188 @Override 189 public void postConfig() { 190 configured = regexMappingProcessors.isEmpty(); 191 if (fieldProcessorList != null) { 192 fieldProcessorMap = fieldProcessorList.stream().collect(Collectors.toMap(FieldProcessor::getFieldName, Function.identity())); 193 } else { 194 fieldProcessorList = new ArrayList<>(); 195 fieldProcessorList.addAll(fieldProcessorMap.values()); 196 } 197 Set<String> metadataNames = new HashSet<>(); 198 for (FieldExtractor<?> extractor : metadataExtractors) { 199 String newMetadataName = extractor.getMetadataName(); 200 if (metadataNames.contains(newMetadataName)) { 201 throw new PropertyException("","metadataExtractors", 202 "Two metadata extractors found referencing the same metadata name '" + newMetadataName + "'"); 203 } else { 204 metadataNames.add(newMetadataName); 205 } 206 } 207 } 208 209 /** 210 * Returns the response processor this RowProcessor uses. 211 * @return The response processor. 212 */ 213 public ResponseProcessor<T> getResponseProcessor() { 214 return responseProcessor; 215 } 216 217 /** 218 * Returns the map of {@link FieldProcessor}s this RowProcessor uses. 219 * @return The field processors. 220 */ 221 public Map<String,FieldProcessor> getFieldProcessors() { 222 return Collections.unmodifiableMap(fieldProcessorMap); 223 } 224 225 /** 226 * Returns the set of {@link FeatureProcessor}s this RowProcessor uses. 227 * @return The feature processors. 228 */ 229 public Set<FeatureProcessor> getFeatureProcessors() { 230 return Collections.unmodifiableSet(featureProcessors); 231 } 232 233 /** 234 * Generate an {@link Example} from the supplied row. Returns an empty Optional if 235 * there are no features, or the response is required but it was not found. The latter case is 236 * used at training time. 237 * @param row The row to process. 238 * @param outputRequired If an Output must be found in the row to return an Example. 239 * @return An Optional containing an Example if the row was valid, an empty Optional otherwise. 240 */ 241 public Optional<Example<T>> generateExample(ColumnarIterator.Row row, boolean outputRequired) { 242 String responseValue = row.getRowData().get(responseProcessor.getFieldName()); 243 Optional<T> labelOpt = responseProcessor.process(responseValue); 244 if (!labelOpt.isPresent() && outputRequired) { 245 return Optional.empty(); 246 } 247 248 List<ColumnarFeature> features = generateFeatures(row.getRowData()); 249 250 if (features.isEmpty()) { 251 logger.warning(String.format("Row %d empty of features, omitting", row.getIndex())); 252 return Optional.empty(); 253 } else { 254 T label = labelOpt.orElse(responseProcessor.getOutputFactory().getUnknownOutput()); 255 256 Map<String,Object> metadata = generateMetadata(row); 257 258 Example<T> example; 259 if (weightExtractor == null) { 260 example = new ArrayExample<>(label,metadata); 261 } else { 262 example = new ArrayExample<>(label, 263 weightExtractor.extract(row).orElse(Example.DEFAULT_WEIGHT), 264 metadata); 265 } 266 example.addAll(features); 267 return Optional.of(example); 268 } 269 } 270 271 /** 272 * Generate an {@link Example} from the supplied row. Returns an empty Optional if 273 * there are no features, or the response is required but it was not found. 274 * <p> 275 * Supplies -1 as the example index, used in cases where the index isn't meaningful. 276 * @param row The row to process. 277 * @param outputRequired If an Output must be found in the row to return an Example. 278 * @return An Optional containing an Example if the row was valid, an empty Optional otherwise. 279 */ 280 public Optional<Example<T>> generateExample(Map<String,String> row, boolean outputRequired) { 281 return generateExample(-1,row,outputRequired); 282 } 283 284 /** 285 * Generate an {@link Example} from the supplied row. Returns an empty Optional if 286 * there are no features, or the response is required but it was not found. The latter case is 287 * used at training time. 288 * @param idx The index for use in the example metadata if desired. 289 * @param row The row to process. 290 * @param outputRequired If an Output must be found in the row to return an Example. 291 * @return An Optional containing an Example if the row was valid, an empty Optional otherwise. 292 */ 293 public Optional<Example<T>> generateExample(long idx, Map<String,String> row, boolean outputRequired) { 294 return generateExample(new ColumnarIterator.Row(idx, new ArrayList<>(row.keySet()), row), outputRequired); 295 } 296 297 /** 298 * Generates the example metadata from the supplied row and index. 299 * @param row The row to process. 300 * @return A (possibly empty) map containing the metadata. 301 */ 302 public Map<String,Object> generateMetadata(ColumnarIterator.Row row) { 303 if (metadataExtractors.isEmpty()) { 304 return Collections.emptyMap(); 305 } else { 306 Map<String,Object> metadataMap = new HashMap<>(); 307 long idx = row.getIndex(); 308 309 for (FieldExtractor<?> field : metadataExtractors) { 310 String metadataName = field.getMetadataName(); 311 Optional<?> extractedValue = field.extract(row); 312 if(extractedValue.isPresent()) { 313 metadataMap.put(metadataName, extractedValue.get()); 314 } else { 315 logger.warning("Failed to extract field with name " + metadataName + " from index " + idx); 316 } 317 } 318 319 return metadataMap; 320 } 321 } 322 323 /** 324 * Generates the features from the supplied row. 325 * @param row The row to process. 326 * @return A (possibly empty) list of {@link ColumnarFeature}s. 327 */ 328 public List<ColumnarFeature> generateFeatures(Map<String,String> row) { 329 if (!configured) { 330 throw new IllegalStateException("expandRegexMapping not called, yet there are entries in regexMappingProcessors which have not been bound to a field name."); 331 } 332 List<ColumnarFeature> features = new ArrayList<>(); 333 334 for (Map.Entry<String,FieldProcessor> e : fieldProcessorMap.entrySet()) { 335 String value = row.get(e.getKey()); 336 if (value != null) { 337 value = value.replace('\n', ' ').trim(); 338 features.addAll(e.getValue().process(value)); 339 } 340 } 341 342 for (FeatureProcessor f : featureProcessors) { 343 features = f.process(features); 344 } 345 346 return features; 347 } 348 349 /** 350 * The set of column names this will use for the feature processing. 351 * @return The set of column names it processes. 352 */ 353 public Set<String> getColumnNames() { 354 return Collections.unmodifiableSet(fieldProcessorMap.keySet()); 355 } 356 357 /** 358 * Returns a description of the row processor and it's fields. 359 * @return A String description of the RowProcessor. 360 */ 361 public String getDescription() { 362 String weightExtractorStr = weightExtractor == null ? "null" : weightExtractor.toString(); 363 if (configured || regexMappingProcessors.isEmpty()) { 364 return "RowProcessor(responseProcessor=" + responseProcessor.toString() + 365 ",fieldProcessorMap=" + fieldProcessorMap.toString() + 366 ",featureProcessors=" + featureProcessors.toString() + 367 ",metadataExtractors=" + metadataExtractors.toString() + 368 ",weightExtractor=" + weightExtractorStr + ")"; 369 } else { 370 return "RowProcessor(responseProcessor=" + responseProcessor.toString() + 371 ",fieldProcessorMap=" + fieldProcessorMap.toString() + 372 ",regexMappingProcessors=" + regexMappingProcessors.toString() + 373 ",featureProcessors=" + featureProcessors.toString() + 374 ",metadataExtractors=" + metadataExtractors.toString() + 375 ",weightExtractor=" + weightExtractorStr + ")"; 376 } 377 } 378 379 @Override 380 public String toString() { 381 return getDescription(); 382 } 383 384 /** 385 * Returns the metadata keys and value types that are extracted 386 * by this RowProcessor. 387 * @return The metadata keys and value types. 388 */ 389 public Map<String,Class<?>> getMetadataTypes() { 390 if (metadataExtractors.isEmpty()) { 391 return Collections.emptyMap(); 392 } else { 393 Map<String, Class<?>> types = new HashMap<>(); 394 395 for (FieldExtractor<?> extractor : metadataExtractors) { 396 types.put(extractor.getMetadataName(), extractor.getValueType()); 397 } 398 399 return types; 400 } 401 } 402 403 /** 404 * Returns true if the regexes have been expanded into field processors. 405 * @return True if the RowProcessor has seen the set of input fields. 406 */ 407 public boolean isConfigured() { 408 return configured; 409 } 410 411 /** 412 * Uses similar logic to {@link org.tribuo.transform.TransformationMap#validateTransformations} to check the regexes 413 * against the {@link ImmutableFeatureMap} contained in the supplied {@link Model}. 414 * Throws an IllegalArgumentException if any regexes overlap with 415 * themselves, or with the currently defined set of fieldProcessorMap. 416 * @param model The model to use to expand the regexes. 417 */ 418 public void expandRegexMapping(Model<T> model) { 419 expandRegexMapping(model.getFeatureIDMap()); 420 } 421 422 /** 423 * Uses similar logic to {@link org.tribuo.transform.TransformationMap#validateTransformations} to check the regexes 424 * against the supplied feature map. Throws an IllegalArgumentException if any regexes overlap with 425 * themselves, or with the currently defined set of fieldProcessorMap. 426 * @param featureMap The feature map to use to expand the regexes. 427 */ 428 public void expandRegexMapping(ImmutableFeatureMap featureMap) { 429 ArrayList<String> fieldNames = new ArrayList<>(featureMap.size()); 430 431 for (VariableInfo v : featureMap) { 432 String[] split = FEATURE_NAME_PATTERN.split(v.getName(),1); 433 String fieldName = split[0]; 434 fieldNames.add(fieldName); 435 } 436 437 expandRegexMapping(fieldNames); 438 } 439 440 /** 441 * Uses similar logic to {@link org.tribuo.transform.TransformationMap#validateTransformations} to check the regexes 442 * against the supplied list of field names. Throws an IllegalArgumentException if any regexes overlap with 443 * themselves, or with the currently defined set of fieldProcessorMap or if there are unmatched regexes after 444 * processing. 445 * @param fieldNames The list of field names. 446 */ 447 public void expandRegexMapping(Collection<String> fieldNames) { 448 if (configured) { 449 logger.warning("RowProcessor was already configured, yet expandRegexMapping was called with " + fieldNames.toString()); 450 } else { 451 Set<String> regexesMatchingFieldNames = partialExpandRegexMapping(fieldNames); 452 453 if (regexesMatchingFieldNames.size() != regexMappingProcessors.size()) { 454 throw new IllegalArgumentException("Failed to match all the regexes, found " + regexesMatchingFieldNames.size() + ", required " + regexMappingProcessors.size()); 455 } else { 456 regexMappingProcessors.clear(); 457 configured = true; 458 } 459 } 460 } 461 462 /** 463 * Caveat Implementor! This method contains the logic of {@link org.tribuo.data.columnar.RowProcessor#expandRegexMapping} 464 * without any of the checks that ensure the RowProcessor is in a valid state. This can be used in a subclass to expand a regex mapping 465 * several times for a single instance of RowProcessor. The caller is responsible for ensuring that fieldNames are not duplicated 466 * within or between calls. 467 * @param fieldNames The list of field names - should contain only previously unseen field names. 468 * @return the set of regexes that were matched by fieldNames. 469 */ 470 protected Set<String> partialExpandRegexMapping(Collection<String> fieldNames) { 471 HashSet<String> regexesMatchingFieldNames = new HashSet<>(); 472 // Loop over all regexes 473 for (Map.Entry<String,FieldProcessor> e : regexMappingProcessors.entrySet()) { 474 Pattern p = Pattern.compile(e.getKey()); 475 // Loop over all field names 476 for (String s : fieldNames) { 477 // Check if the pattern matches the field name 478 if (p.matcher(s).matches()) { 479 // If it matches, add the field to the fieldProcessorMap map and the fieldProcessorList (for the provenance). 480 FieldProcessor newProcessor = e.getValue().copy(s); 481 fieldProcessorList.add(newProcessor); 482 FieldProcessor f = fieldProcessorMap.put(s,newProcessor); 483 484 485 if (f != null) { 486 throw new IllegalArgumentException("Regex " + p.toString() + " matched field " + s + " which already had a field processor " + f.toString()); 487 } 488 489 regexesMatchingFieldNames.add(e.getKey()); 490 } 491 } 492 } 493 return regexesMatchingFieldNames; 494 } 495 496 /** 497 * @deprecated In a future release this API will change, in the meantime this is the correct way to get a row 498 * processor with clean state. 499 * <p> 500 * When using regexMappingProcessors, RowProcessor is stateful in a way that can sometimes make it fail the second 501 * time it is used. Concretely: 502 * <pre> 503 * RowProcessor rp; 504 * Dataset ds1 = new MutableDataset(new CSVDataSource(csvfile1, rp)); 505 * Dataset ds2 = new MutableDataset(new CSVDataSource(csvfile2, rp)); // this may fail due to state in rp 506 * </pre> 507 * This method returns a RowProcessor with clean state and the same configuration as this row processor. 508 * @return a RowProcessor instance with clean state and the same configuration as this row processor. 509 */ 510 @Deprecated 511 public RowProcessor<T> copy() { 512 return new RowProcessor<>(metadataExtractors, weightExtractor, responseProcessor, fieldProcessorMap, regexMappingProcessors, featureProcessors); 513 } 514 515 @Override 516 public ConfiguredObjectProvenance getProvenance() { 517 return new ConfiguredObjectProvenanceImpl(this,"RowProcessor"); 518 } 519 520}