001/*
002 * Copyright (c) 2015-2020, Oracle and/or its affiliates. All rights reserved.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.tribuo.data.columnar;
018
019import com.oracle.labs.mlrg.olcut.config.Config;
020import com.oracle.labs.mlrg.olcut.config.Configurable;
021import com.oracle.labs.mlrg.olcut.config.PropertyException;
022import com.oracle.labs.mlrg.olcut.provenance.ConfiguredObjectProvenance;
023import com.oracle.labs.mlrg.olcut.provenance.Provenancable;
024import com.oracle.labs.mlrg.olcut.provenance.impl.ConfiguredObjectProvenanceImpl;
025import org.tribuo.Example;
026import org.tribuo.ImmutableFeatureMap;
027import org.tribuo.Model;
028import org.tribuo.Output;
029import org.tribuo.VariableInfo;
030import org.tribuo.impl.ArrayExample;
031
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.Collections;
035import java.util.HashMap;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.Optional;
040import java.util.Set;
041import java.util.function.Function;
042import java.util.logging.Logger;
043import java.util.regex.Pattern;
044import java.util.stream.Collectors;
045
046/**
047 * A processor which takes a Map of String to String and returns an {@link Example}.
048 * <p>
049 * It accepts a {@link ResponseProcessor} which converts the response field into an {@link Output},
050 * a Map of {@link FieldProcessor}s which converts fields into {@link ColumnarFeature}s, and a Set
051 * of {@link FeatureProcessor}s which processes the list of {@link ColumnarFeature}s before {@link Example}
052 * construction. Optionally metadata and weights can be extracted using {@link FieldExtractor}s
053 * and written into each example as they are constructed.
054 * <p>
055 * If the metadata extractors are invalid (i.e., two extractors write to the same metadata key),
056 * the RowProcessor throws {@link PropertyException}.
057 */
058public class RowProcessor<T extends Output<T>> implements Configurable, Provenancable<ConfiguredObjectProvenance> {
059
060    private static final Logger logger = Logger.getLogger(RowProcessor.class.getName());
061
062    private static final String FEATURE_NAME_REGEX = "["+ColumnarFeature.JOINER+FieldProcessor.NAMESPACE+"]";
063
064    private static final Pattern FEATURE_NAME_PATTERN = Pattern.compile(FEATURE_NAME_REGEX);
065
066    @Config(description="Extractors for the example metadata.")
067    private List<FieldExtractor<?>> metadataExtractors = Collections.emptyList();
068
069    @Config(description="Extractor for the example weight.")
070    protected FieldExtractor<Float> weightExtractor = null;
071
072    @Config(mandatory = true,description="Processor which extracts the response.")
073    protected ResponseProcessor<T> responseProcessor;
074
075    @Config(mandatory = true,description="The list of field processors to use.")
076    private List<FieldProcessor> fieldProcessorList;
077
078    // fieldProcessorList is unpacked into this map to make the config files less complex.
079    // fieldProcessorMap is the store of record for field processors.
080    protected Map<String,FieldProcessor> fieldProcessorMap;
081
082    @Config(description="A set of feature processors to apply after extraction.")
083    private Set<FeatureProcessor> featureProcessors = new HashSet<>();
084
085    @Config(description="A map from a regex to field processors to apply to fields matching the regex.")
086    protected Map<String,FieldProcessor> regexMappingProcessors = new HashMap<>();
087
088    protected boolean configured;
089
090    /**
091     * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable,
092     * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed.
093     * <p>
094     * This processor does not generate any additional metadata for the examples, nor does it set the
095     * weight value on generated examples.
096     * @param responseProcessor The response processor to use.
097     * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields.
098     */
099    public RowProcessor(ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap) {
100        this(Collections.emptyList(),null,responseProcessor,fieldProcessorMap,Collections.emptySet());
101    }
102
103    /**
104     * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable,
105     * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed.
106     * <p>
107     * After extraction the features are then processed using the supplied set of feature processors.
108     * These processors can be used to insert conjunction features which are triggered when
109     * multiple features appear, or to filter out unnecessary features.
110     * <p>
111     * This processor does not generate any additional metadata for the examples, nor does it set the
112     * weight value on generated examples.
113     * @param responseProcessor The response processor to use.
114     * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields.
115     * @param featureProcessors The feature processors to run on each extracted feature list.
116     */
117    public RowProcessor(ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap, Set<FeatureProcessor> featureProcessors) {
118        this(Collections.emptyList(),null,responseProcessor,fieldProcessorMap,featureProcessors);
119    }
120
121    /**
122     * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable,
123     * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed.
124     * <p>
125     * After extraction the features are then processed using the supplied set of feature processors.
126     * These processors can be used to insert conjunction features which are triggered when
127     * multiple features appear, or to filter out unnecessary features.
128     * <p>
129     * Additionally this processor can extract a weight from each row and insert it into the example, along
130     * with more general metadata fields (e.g., the row number, date stamps). The weightExtractor can be null,
131     * and if so the weights are left unset.
132     * @param metadataExtractors The metadata extractors to run per example. If two metadata extractors emit
133     *                           the same metadata name then the constructor throws a PropertyException.
134     * @param weightExtractor The weight extractor, if null the weights are left unset at their default.
135     * @param responseProcessor The response processor to use.
136     * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields.
137     * @param featureProcessors The feature processors to run on each extracted feature list.
138     */
139    public RowProcessor(List<FieldExtractor<?>> metadataExtractors, FieldExtractor<Float> weightExtractor,
140                        ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap,
141                        Set<FeatureProcessor> featureProcessors) {
142        this(metadataExtractors,weightExtractor,responseProcessor,fieldProcessorMap,Collections.emptyMap(),featureProcessors);
143    }
144
145    /**
146     * Constructs a RowProcessor using the supplied responseProcessor to extract the response variable,
147     * and the supplied fieldProcessorMap to control which fields are parsed and how they are parsed.
148     * <p>
149     * In addition this processor can instantiate field processors which match the regexes supplied in
150     * the regexMappingProcessors. If a regex matches a field which already has a fieldProcessor assigned to
151     * it, it throws an IllegalArgumentException.
152     * <p>
153     * After extraction the features are then processed using the supplied set of feature processors.
154     * These processors can be used to insert conjunction features which are triggered when
155     * multiple features appear, or to filter out unnecessary features.
156     * <p>
157     * Additionally this processor can extract a weight from each row and insert it into the example, along
158     * with more general metadata fields (e.g., the row number, date stamps). The weightExtractor can be null,
159     * and if so the weights are left unset.
160     * @param metadataExtractors The metadata extractors to run per example. If two metadata extractors emit
161     *                           the same metadata name then the constructor throws a PropertyException.
162     * @param weightExtractor The weight extractor, if null the weights are left unset at their default.
163     * @param responseProcessor The response processor to use.
164     * @param fieldProcessorMap The keys are the field names and the values are the field processors to apply to those fields.
165     * @param regexMappingProcessors A set of field processors which can be instantiated if the regexes match the field names.
166     * @param featureProcessors The feature processors to run on each extracted feature list.
167     */
168    public RowProcessor(List<FieldExtractor<?>> metadataExtractors, FieldExtractor<Float> weightExtractor,
169                        ResponseProcessor<T> responseProcessor, Map<String,FieldProcessor> fieldProcessorMap,
170                        Map<String,FieldProcessor> regexMappingProcessors, Set<FeatureProcessor> featureProcessors) {
171        this.metadataExtractors = metadataExtractors.isEmpty() ? Collections.emptyList() : new ArrayList<>(metadataExtractors);
172        this.weightExtractor = weightExtractor;
173        this.responseProcessor = responseProcessor;
174        this.fieldProcessorMap = new HashMap<>(fieldProcessorMap);
175        this.regexMappingProcessors = regexMappingProcessors.isEmpty() ? Collections.emptyMap() : new HashMap<>(regexMappingProcessors);
176        this.featureProcessors.addAll(featureProcessors);
177        postConfig();
178    }
179
180    /**
181     * For olcut.
182     */
183    protected RowProcessor() {}
184
185    /**
186     * Used by the OLCUT configuration system, and should not be called by external code.
187     */
188    @Override
189    public void postConfig() {
190        configured = regexMappingProcessors.isEmpty();
191        if (fieldProcessorList != null) {
192            fieldProcessorMap = fieldProcessorList.stream().collect(Collectors.toMap(FieldProcessor::getFieldName, Function.identity()));
193        } else {
194            fieldProcessorList = new ArrayList<>();
195            fieldProcessorList.addAll(fieldProcessorMap.values());
196        }
197        Set<String> metadataNames = new HashSet<>();
198        for (FieldExtractor<?> extractor : metadataExtractors) {
199            String newMetadataName = extractor.getMetadataName();
200            if (metadataNames.contains(newMetadataName)) {
201                throw new PropertyException("","metadataExtractors",
202                        "Two metadata extractors found referencing the same metadata name '" + newMetadataName + "'");
203            } else {
204                metadataNames.add(newMetadataName);
205            }
206        }
207    }
208
209    /**
210     * Returns the response processor this RowProcessor uses.
211     * @return The response processor.
212     */
213    public ResponseProcessor<T> getResponseProcessor() {
214        return responseProcessor;
215    }
216
217    /**
218     * Returns the map of {@link FieldProcessor}s this RowProcessor uses.
219     * @return The field processors.
220     */
221    public Map<String,FieldProcessor> getFieldProcessors() {
222        return Collections.unmodifiableMap(fieldProcessorMap);
223    }
224
225    /**
226     * Returns the set of {@link FeatureProcessor}s this RowProcessor uses.
227     * @return The feature processors.
228     */
229    public Set<FeatureProcessor> getFeatureProcessors() {
230        return Collections.unmodifiableSet(featureProcessors);
231    }
232
233    /**
234     * Generate an {@link Example} from the supplied row. Returns an empty Optional if
235     * there are no features, or the response is required but it was not found. The latter case is
236     * used at training time.
237     * @param row The row to process.
238     * @param outputRequired If an Output must be found in the row to return an Example.
239     * @return An Optional containing an Example if the row was valid, an empty Optional otherwise.
240     */
241    public Optional<Example<T>> generateExample(ColumnarIterator.Row row, boolean outputRequired) {
242        String responseValue = row.getRowData().get(responseProcessor.getFieldName());
243        Optional<T> labelOpt = responseProcessor.process(responseValue);
244        if (!labelOpt.isPresent() && outputRequired) {
245            return Optional.empty();
246        }
247
248        List<ColumnarFeature> features = generateFeatures(row.getRowData());
249
250        if (features.isEmpty()) {
251            logger.warning(String.format("Row %d empty of features, omitting", row.getIndex()));
252            return Optional.empty();
253        } else {
254            T label = labelOpt.orElse(responseProcessor.getOutputFactory().getUnknownOutput());
255
256            Map<String,Object> metadata = generateMetadata(row);
257
258            Example<T> example;
259            if (weightExtractor == null) {
260                example = new ArrayExample<>(label,metadata);
261            } else {
262                example = new ArrayExample<>(label,
263                        weightExtractor.extract(row).orElse(Example.DEFAULT_WEIGHT),
264                        metadata);
265            }
266            example.addAll(features);
267            return Optional.of(example);
268        }
269    }
270
271    /**
272     * Generate an {@link Example} from the supplied row. Returns an empty Optional if
273     * there are no features, or the response is required but it was not found.
274     * <p>
275     * Supplies -1 as the example index, used in cases where the index isn't meaningful.
276     * @param row The row to process.
277     * @param outputRequired If an Output must be found in the row to return an Example.
278     * @return An Optional containing an Example if the row was valid, an empty Optional otherwise.
279     */
280    public Optional<Example<T>> generateExample(Map<String,String> row, boolean outputRequired) {
281        return generateExample(-1,row,outputRequired);
282    }
283
284    /**
285     * Generate an {@link Example} from the supplied row. Returns an empty Optional if
286     * there are no features, or the response is required but it was not found. The latter case is
287     * used at training time.
288     * @param idx The index for use in the example metadata if desired.
289     * @param row The row to process.
290     * @param outputRequired If an Output must be found in the row to return an Example.
291     * @return An Optional containing an Example if the row was valid, an empty Optional otherwise.
292     */
293    public Optional<Example<T>> generateExample(long idx, Map<String,String> row, boolean outputRequired) {
294        return generateExample(new ColumnarIterator.Row(idx, new ArrayList<>(row.keySet()), row), outputRequired);
295    }
296
297    /**
298     * Generates the example metadata from the supplied row and index.
299     * @param row The row to process.
300     * @return A (possibly empty) map containing the metadata.
301     */
302    public Map<String,Object> generateMetadata(ColumnarIterator.Row row) {
303        if (metadataExtractors.isEmpty()) {
304            return Collections.emptyMap();
305        } else {
306            Map<String,Object> metadataMap = new HashMap<>();
307            long idx = row.getIndex();
308
309            for (FieldExtractor<?> field : metadataExtractors) {
310                String metadataName = field.getMetadataName();
311                Optional<?> extractedValue = field.extract(row);
312                if(extractedValue.isPresent()) {
313                    metadataMap.put(metadataName, extractedValue.get());
314                } else {
315                    logger.warning("Failed to extract field with name " + metadataName + " from index " + idx);
316                }
317            }
318
319            return metadataMap;
320        }
321    }
322
323    /**
324     * Generates the features from the supplied row.
325     * @param row The row to process.
326     * @return A (possibly empty) list of {@link ColumnarFeature}s.
327     */
328    public List<ColumnarFeature> generateFeatures(Map<String,String> row) {
329        if (!configured) {
330            throw new IllegalStateException("expandRegexMapping not called, yet there are entries in regexMappingProcessors which have not been bound to a field name.");
331        }
332        List<ColumnarFeature> features = new ArrayList<>();
333
334        for (Map.Entry<String,FieldProcessor> e : fieldProcessorMap.entrySet()) {
335            String value = row.get(e.getKey());
336            if (value != null) {
337                value = value.replace('\n', ' ').trim();
338                features.addAll(e.getValue().process(value));
339            }
340        }
341
342        for (FeatureProcessor f : featureProcessors) {
343            features = f.process(features);
344        }
345
346        return features;
347    }
348
349    /**
350     * The set of column names this will use for the feature processing.
351     * @return The set of column names it processes.
352     */
353    public Set<String> getColumnNames() {
354        return Collections.unmodifiableSet(fieldProcessorMap.keySet());
355    }
356
357    /**
358     * Returns a description of the row processor and it's fields.
359     * @return A String description of the RowProcessor.
360     */
361    public String getDescription() {
362        String weightExtractorStr = weightExtractor == null ? "null" : weightExtractor.toString();
363        if (configured || regexMappingProcessors.isEmpty()) {
364            return "RowProcessor(responseProcessor=" + responseProcessor.toString() +
365                    ",fieldProcessorMap=" + fieldProcessorMap.toString() +
366                    ",featureProcessors=" + featureProcessors.toString() +
367                    ",metadataExtractors=" + metadataExtractors.toString() +
368                    ",weightExtractor=" + weightExtractorStr + ")";
369        } else {
370            return "RowProcessor(responseProcessor=" + responseProcessor.toString() +
371                    ",fieldProcessorMap=" + fieldProcessorMap.toString() +
372                    ",regexMappingProcessors=" + regexMappingProcessors.toString() +
373                    ",featureProcessors=" + featureProcessors.toString() +
374                    ",metadataExtractors=" + metadataExtractors.toString() +
375                    ",weightExtractor=" + weightExtractorStr + ")";
376        }
377    }
378
379    @Override
380    public String toString() {
381        return getDescription();
382    }
383
384    /**
385     * Returns the metadata keys and value types that are extracted
386     * by this RowProcessor.
387     * @return The metadata keys and value types.
388     */
389    public Map<String,Class<?>> getMetadataTypes() {
390        if (metadataExtractors.isEmpty()) {
391            return Collections.emptyMap();
392        } else {
393            Map<String, Class<?>> types = new HashMap<>();
394
395            for (FieldExtractor<?> extractor : metadataExtractors) {
396                types.put(extractor.getMetadataName(), extractor.getValueType());
397            }
398
399            return types;
400        }
401    }
402
403    /**
404     * Returns true if the regexes have been expanded into field processors.
405     * @return True if the RowProcessor has seen the set of input fields.
406     */
407    public boolean isConfigured() {
408        return configured;
409    }
410
411    /**
412     * Uses similar logic to {@link org.tribuo.transform.TransformationMap#validateTransformations} to check the regexes
413     * against the {@link ImmutableFeatureMap} contained in the supplied {@link Model}.
414     * Throws an IllegalArgumentException if any regexes overlap with
415     * themselves, or with the currently defined set of fieldProcessorMap.
416     * @param model The model to use to expand the regexes.
417     */
418    public void expandRegexMapping(Model<T> model) {
419        expandRegexMapping(model.getFeatureIDMap());
420    }
421
422    /**
423     * Uses similar logic to {@link org.tribuo.transform.TransformationMap#validateTransformations} to check the regexes
424     * against the supplied feature map. Throws an IllegalArgumentException if any regexes overlap with
425     * themselves, or with the currently defined set of fieldProcessorMap.
426     * @param featureMap The feature map to use to expand the regexes.
427     */
428    public void expandRegexMapping(ImmutableFeatureMap featureMap) {
429        ArrayList<String> fieldNames = new ArrayList<>(featureMap.size());
430
431        for (VariableInfo v : featureMap) {
432            String[] split = FEATURE_NAME_PATTERN.split(v.getName(),1);
433            String fieldName = split[0];
434            fieldNames.add(fieldName);
435        }
436
437        expandRegexMapping(fieldNames);
438    }
439
440    /**
441     * Uses similar logic to {@link org.tribuo.transform.TransformationMap#validateTransformations} to check the regexes
442     * against the supplied list of field names. Throws an IllegalArgumentException if any regexes overlap with
443     * themselves, or with the currently defined set of fieldProcessorMap or if there are unmatched regexes after
444     * processing.
445     * @param fieldNames The list of field names.
446     */
447    public void expandRegexMapping(Collection<String> fieldNames) {
448        if (configured) {
449            logger.warning("RowProcessor was already configured, yet expandRegexMapping was called with " + fieldNames.toString());
450        } else {
451            Set<String> regexesMatchingFieldNames = partialExpandRegexMapping(fieldNames);
452
453            if (regexesMatchingFieldNames.size() != regexMappingProcessors.size()) {
454                throw new IllegalArgumentException("Failed to match all the regexes, found " + regexesMatchingFieldNames.size() + ", required " + regexMappingProcessors.size());
455            } else {
456                regexMappingProcessors.clear();
457                configured = true;
458            }
459        }
460    }
461
462    /**
463     * Caveat Implementor! This method contains the logic of {@link org.tribuo.data.columnar.RowProcessor#expandRegexMapping}
464     * without any of the checks that ensure the RowProcessor is in a valid state. This can be used in a subclass to expand a regex mapping
465     * several times for a single instance of RowProcessor. The caller is responsible for ensuring that fieldNames are not duplicated
466     * within or between calls.
467     * @param fieldNames The list of field names - should contain only previously unseen field names.
468     * @return the set of regexes that were matched by fieldNames.
469     */
470    protected Set<String> partialExpandRegexMapping(Collection<String> fieldNames) {
471        HashSet<String> regexesMatchingFieldNames = new HashSet<>();
472        // Loop over all regexes
473        for (Map.Entry<String,FieldProcessor> e : regexMappingProcessors.entrySet()) {
474            Pattern p = Pattern.compile(e.getKey());
475            // Loop over all field names
476            for (String s : fieldNames) {
477                // Check if the pattern matches the field name
478                if (p.matcher(s).matches()) {
479                    // If it matches, add the field to the fieldProcessorMap map and the fieldProcessorList (for the provenance).
480                    FieldProcessor newProcessor = e.getValue().copy(s);
481                    fieldProcessorList.add(newProcessor);
482                    FieldProcessor f = fieldProcessorMap.put(s,newProcessor);
483
484
485                    if (f != null) {
486                        throw new IllegalArgumentException("Regex " + p.toString() + " matched field " + s + " which already had a field processor " + f.toString());
487                    }
488
489                    regexesMatchingFieldNames.add(e.getKey());
490                }
491            }
492        }
493        return regexesMatchingFieldNames;
494    }
495
496    /**
497     * @deprecated In a future release this API will change, in the meantime this is the correct way to get a row
498     *   processor with clean state.
499     * <p>
500     * When using regexMappingProcessors, RowProcessor is stateful in a way that can sometimes make it fail the second
501     * time it is used. Concretely:
502     * <pre>
503     *     RowProcessor rp;
504     *     Dataset ds1 = new MutableDataset(new CSVDataSource(csvfile1, rp));
505     *     Dataset ds2 = new MutableDataset(new CSVDataSource(csvfile2, rp)); // this may fail due to state in rp
506     * </pre>
507     * This method returns a RowProcessor with clean state and the same configuration as this row processor.
508     * @return a RowProcessor instance with clean state and the same configuration as this row processor.
509     */
510    @Deprecated
511    public RowProcessor<T> copy() {
512        return new RowProcessor<>(metadataExtractors, weightExtractor, responseProcessor, fieldProcessorMap, regexMappingProcessors, featureProcessors);
513    }
514
515    @Override
516    public ConfiguredObjectProvenance getProvenance() {
517        return new ConfiguredObjectProvenanceImpl(this,"RowProcessor");
518    }
519
520}