001/*
002 * Copyright (c) 2015-2020, Oracle and/or its affiliates. All rights reserved.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.tribuo.data.columnar;
018
019import com.oracle.labs.mlrg.olcut.config.Config;
020import org.tribuo.ConfigurableDataSource;
021import org.tribuo.Example;
022import org.tribuo.Output;
023import org.tribuo.OutputFactory;
024
025import java.util.Iterator;
026import java.util.Map;
027import java.util.NoSuchElementException;
028import java.util.Optional;
029
030/**
031 * A {@link ConfigurableDataSource} base class which takes columnar data (e.g., csv or DB table rows) and generates {@link Example}s.
032 */
033public abstract class ColumnarDataSource<T extends Output<T>> implements ConfigurableDataSource<T> {
034
035    @Config(mandatory = true,description="The output factory to use.")
036    private OutputFactory<T> outputFactory;
037
038    @Config(mandatory = true,description="The row processor to use.")
039    protected RowProcessor<T> rowProcessor;
040
041    @Config(description="Is an output required from each row?")
042    protected boolean outputRequired = true;
043
044    /**
045     * For OLCUT.
046     */
047    protected ColumnarDataSource() {}
048
049    /**
050     * Constructs a columnar data source with the specified parameters.
051     * @param outputFactory The output factory.
052     * @param rowProcessor The row processor which converts rows into examples.
053     * @param outputRequired Is an output required for each example.
054     */
055    protected ColumnarDataSource(OutputFactory<T> outputFactory, RowProcessor<T> rowProcessor, boolean outputRequired) {
056        this.outputFactory = outputFactory;
057        this.rowProcessor = rowProcessor;
058        this.outputRequired = outputRequired;
059    }
060
061    /**
062     * Returns the metadata keys and value types that are created
063     * by this DataSource.
064     * @return The metadata keys and value types.
065     */
066    public Map<String,Class<?>> getMetadataTypes() {
067        return rowProcessor.getMetadataTypes();
068    }
069
070    @Override
071    public OutputFactory<T> getOutputFactory() {
072        return outputFactory;
073    }
074
075    @Override
076    public Iterator<Example<T>> iterator() {
077        return new InnerIterator<>(rowProcessor,rowIterator(),outputRequired);
078    }
079
080    /**
081     * The iterator that emits {@link ColumnarIterator.Row} objects from the
082     * underlying data source.
083     * @return The row level iterator.
084     */
085    protected abstract ColumnarIterator rowIterator();
086
087    /**
088     * Wraps the columnar iterator and converts it into an iterator of example.
089     * Copies the RowProcessor and expands it's regexes first.
090     * @param <T> The output type.
091     */
092    private static class InnerIterator<T extends Output<T>> implements Iterator<Example<T>> {
093        private final boolean outputRequired;
094        private final ColumnarIterator iterator;
095        private final RowProcessor<T> processor;
096
097        private Example<T> buffer = null;
098
099        InnerIterator(RowProcessor<T> processor, ColumnarIterator iterator, boolean outputRequired) {
100            this.processor = processor.copy();
101            if (!this.processor.isConfigured()) {
102                this.processor.expandRegexMapping(iterator.getFields());
103            }
104            this.iterator = iterator;
105            this.outputRequired = outputRequired;
106        }
107
108        @Override
109        public boolean hasNext() {
110            if (buffer != null) {
111                return true;
112            }
113
114            while (buffer == null && iterator.hasNext()) {
115                ColumnarIterator.Row m = iterator.next();
116
117                Optional<Example<T>> exampleOpt = processor.generateExample(m,outputRequired);
118                if (exampleOpt.isPresent()) {
119                    buffer = exampleOpt.get();
120                }
121            }
122            return buffer != null;
123        }
124
125        @Override
126        public Example<T> next() {
127            if (hasNext()) {
128                Example<T> ret = buffer;
129                buffer = null;
130                return ret;
131            } else {
132                throw new NoSuchElementException("No more data");
133            }
134        }
135    }
136}