/*
 * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
 * or more contributor license agreements. Licensed under the "Elastic License
 * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
 * Public License v 1"; you may not use this file except in compliance with, at
 * your election, the "Elastic License 2.0", the "GNU Affero General Public
 * License v3.0 only", or the "Server Side Public License, v 1".
 */
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.mapper.RangeType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

import static java.lang.Long.max;
import static java.lang.Long.min;

/**
 * An aggregator for date values. Every date is rounded down using a configured
 * {@link Rounding}.
 *
 * @see Rounding
 */
class DateRangeHistogramAggregator extends BucketsAggregator {

    private final ValuesSource.Range valuesSource;
    private final DocValueFormat formatter;
    private final Rounding rounding;
    /**
     * The rounding prepared for rewriting the data in the shard.
     */
    private final Rounding.Prepared preparedRounding;
    private final BucketOrder order;
    private final boolean keyed;

    private final long minDocCount;
    private final boolean downsampledResultsOffset;
    private final LongBounds extendedBounds;
    private final LongBounds hardBounds;

    private final LongKeyedBucketOrds bucketOrds;

    DateRangeHistogramAggregator(
        String name,
        AggregatorFactories factories,
        Rounding rounding,
        BucketOrder order,
        boolean keyed,
        long minDocCount,
        boolean downsampledResultsOffset,
        @Nullable LongBounds extendedBounds,
        @Nullable LongBounds hardBounds,
        ValuesSourceConfig valuesSourceConfig,
        AggregationContext context,
        Aggregator parent,
        CardinalityUpperBound cardinality,
        Map<String, Object> metadata
    ) throws IOException {

        super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
        this.rounding = rounding;
        this.preparedRounding = valuesSourceConfig.roundingPreparer(context).apply(rounding);
        this.order = order;
        order.validate(this);
        this.keyed = keyed;
        this.minDocCount = minDocCount;
        this.downsampledResultsOffset = downsampledResultsOffset;
        this.extendedBounds = extendedBounds;
        this.hardBounds = hardBounds;
        // TODO: Stop using null here
        this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Range) valuesSourceConfig.getValuesSource() : null;
        this.formatter = valuesSourceConfig.format();
        if (this.valuesSource.rangeType() != RangeType.DATE) {
            throw new IllegalArgumentException(
                "Expected date range type but found range type [" + this.valuesSource.rangeType().name + "]"
            );
        }

        bucketOrds = LongKeyedBucketOrds.build(bigArrays(), cardinality);
    }

    @Override
    public ScoreMode scoreMode() {
        if (valuesSource != null && valuesSource.needsScores()) {
            return ScoreMode.COMPLETE;
        }
        return super.scoreMode();
    }

    @Override
    public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
        if (valuesSource == null) {
            return LeafBucketCollector.NO_OP_COLLECTOR;
        }
        // Is it possible for multiple values here? Multiple ranges are encoded into the same BytesRef in the binary doc values
        final BinaryDocValues values = FieldData.unwrapSingleton(valuesSource.bytesValues(aggCtx.getLeafReaderContext()));
        assert values != null;
        final RangeType rangeType = valuesSource.rangeType();
        return new LeafBucketCollectorBase(sub, values) {
            @Override
            public void collect(int doc, long owningBucketOrd) throws IOException {
                if (values.advanceExact(doc)) {
                    long previousKey = Long.MIN_VALUE;
                    final List<RangeFieldMapper.Range> ranges = rangeType.decodeRanges(values.binaryValue());
                    long previousFrom = Long.MIN_VALUE;
                    for (RangeFieldMapper.Range range : ranges) {
                        Long from = (Long) range.getFrom();
                        // The encoding should ensure that this assert is always true.
                        assert from >= previousFrom : "Start of range not >= previous start";
                        final Long to = (Long) range.getTo();
                        final long effectiveFrom = (hardBounds != null && hardBounds.getMin() != null)
                            ? max(from, hardBounds.getMin())
                            : from;
                        final long effectiveTo = (hardBounds != null && hardBounds.getMax() != null) ? min(to, hardBounds.getMax()) : to;
                        final long startKey = preparedRounding.round(effectiveFrom);
                        final long endKey = preparedRounding.round(effectiveTo);
                        for (long key = max(startKey, previousKey); key <= endKey; key = preparedRounding.nextRoundingValue(key)) {
                            if (key == previousKey) {
                                continue;
                            }
                            // Bucket collection identical to NumericHistogramAggregator, could be refactored
                            long bucketOrd = bucketOrds.add(owningBucketOrd, key);
                            if (bucketOrd < 0) { // already seen
                                bucketOrd = -1 - bucketOrd;
                                collectExistingBucket(sub, doc, bucketOrd);
                            } else {
                                collectBucket(sub, doc, bucketOrd);
                            }
                        }
                        if (endKey > previousKey) {
                            previousKey = endKey;
                        }
                    }
                }
            }
        };
    }

    @Override
    public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
        return buildAggregationsForVariableBuckets(
            owningBucketOrds,
            bucketOrds,
            (bucketValue, docCount, subAggregationResults) -> new InternalDateHistogram.Bucket(
                bucketValue,
                docCount,
                formatter,
                subAggregationResults
            ),
            (owningBucketOrd, buckets) -> {
                // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
                CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

                // value source will be null for unmapped fields
                // Important: use `rounding` here, not `shardRounding`
                InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
                    ? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
                    : null;
                return new InternalDateHistogram(
                    name,
                    buckets,
                    order,
                    minDocCount,
                    rounding.offset(),
                    emptyBucketInfo,
                    formatter,
                    keyed,
                    downsampledResultsOffset,
                    metadata()
                );
            }
        );
    }

    @Override
    public InternalAggregation buildEmptyAggregation() {
        InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
            ? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
            : null;
        return new InternalDateHistogram(
            name,
            Collections.emptyList(),
            order,
            minDocCount,
            rounding.offset(),
            emptyBucketInfo,
            formatter,
            keyed,
            downsampledResultsOffset,
            metadata()
        );
    }

    @Override
    public void doClose() {
        Releasables.close(bucketOrds);
    }

    @Override
    public void collectDebugInfo(BiConsumer<String, Object> add) {
        add.accept("total_buckets", bucketOrds.size());
    }
}
