/*
 * Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.ProtocolException;
import java.net.URI;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodySubscriber;

import jdk.internal.net.http.common.*;
import jdk.internal.net.http.frame.*;
import jdk.internal.net.http.hpack.DecodingCallback;

import static jdk.internal.net.http.Exchange.MAX_NON_FINAL_RESPONSES;

/**
 * Http/2 Stream handling.
 *
 * REQUESTS
 *
 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
 *
 * sendRequest() -- sendHeadersOnly() + sendBody()
 *
 * sendBodyAsync() -- calls sendBody() in an executor thread.
 *
 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
 *
 * sendRequestAsync() -- calls sendRequest() in an executor thread
 *
 * RESPONSES
 *
 * Multiple responses can be received per request. Responses are queued up on
 * a LinkedList of CF<HttpResponse> and the first one on the list is completed
 * with the next response
 *
 * getResponseAsync() -- queries list of response CFs and returns first one
 *               if one exists. Otherwise, creates one and adds it to list
 *               and returns it. Completion is achieved through the
 *               incoming() upcall from connection reader thread.
 *
 * getResponse() -- calls getResponseAsync() and waits for CF to complete
 *
 * responseBodyAsync() -- calls responseBody() in an executor thread.
 *
 * incoming() -- entry point called from connection reader thread. Frames are
 *               either handled immediately without blocking or for data frames
 *               placed on the stream's inputQ which is consumed by the stream's
 *               reader thread.
 *
 * PushedStream sub class
 * ======================
 * Sending side methods are not used because the request comes from a PUSH_PROMISE
 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
 * is created. PushedStream does not use responseCF list as there can be only
 * one response. The CF is created when the object created and when the response
 * HEADERS frame is received the object is completed.
 */
class Stream<T> extends ExchangeImpl<T> {

    private static final String COOKIE_HEADER = "Cookie";
    final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);

    final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
    final SequentialScheduler sched =
            SequentialScheduler.lockingScheduler(this::schedule);
    final SubscriptionBase userSubscription =
            new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);

    /**
     * This stream's identifier. Assigned lazily by the HTTP2Connection before
     * the stream's first frame is sent.
     */
    protected volatile int streamid;

    long requestContentLen;

    final Http2Connection connection;
    final HttpRequestImpl request;
    final HeadersConsumer rspHeadersConsumer;
    final HttpHeadersBuilder responseHeadersBuilder;
    final HttpHeaders requestPseudoHeaders;
    volatile HttpResponse.BodySubscriber<T> responseSubscriber;
    final HttpRequest.BodyPublisher requestPublisher;
    volatile RequestSubscriber requestSubscriber;
    volatile int responseCode;
    volatile Response response;
    // The exception with which this stream was canceled.
    private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
    volatile CompletableFuture<T> responseBodyCF;
    volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
    volatile boolean stopRequested;

    /** True if END_STREAM has been seen in a frame received on this stream. */
    private volatile boolean remotelyClosed;
    private volatile boolean closed;
    private volatile boolean endStreamSent;
    private volatile boolean finalResponseCodeReceived;
    private volatile boolean trailerReceived;
    private AtomicInteger nonFinalResponseCount = new AtomicInteger();

    // Indicates the first reason that was invoked when sending a ResetFrame
    // to the server. A streamState of 0 indicates that no reset was sent.
    // (see markStream(int code)
    private volatile int streamState; // assigned using STREAM_STATE varhandle.
    private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.

    // state flags
    private boolean requestSent, responseReceived;

    // send lock: prevent sending DataFrames after reset occurred.
    private final Lock sendLock = new ReentrantLock();
    private final Lock stateLock = new ReentrantLock();
    // inputQ lock: methods that take from the inputQ
    //      must not run concurrently.
    private final Lock inputQLock = new ReentrantLock();

    /**
     * A reference to this Stream's connection Send Window controller. The
     * stream MUST acquire the appropriate amount of Send Window before
     * sending any data. Will be null for PushStreams, as they cannot send data.
     */
    private final WindowController windowController;
    private final WindowUpdateSender streamWindowUpdater;

    // Only accessed in all method calls from incoming(), no need for volatile
    private boolean endStreamSeen;

    @Override
    HttpConnection connection() {
        return connection.connection;
    }

    /**
     * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
     * of after user subscription window has re-opened, from SubscriptionBase.request()
     */
    private void schedule() {
        boolean onCompleteCalled = false;
        HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
        // prevents drainInputQueue() from running concurrently
        inputQLock.lock();
        try {
            if (subscriber == null) {
                // pendingResponseSubscriber will be null until response headers have been received and
                // readBodyAsync is called.
                subscriber = responseSubscriber = pendingResponseSubscriber;
                if (subscriber == null) {
                    // can't process anything yet
                    return;
                }
                if (debug.on()) debug.log("subscribing user subscriber");
                subscriber.onSubscribe(userSubscription);
            }
            while (!inputQ.isEmpty() && errorRef.get() == null) {
                Http2Frame frame = inputQ.peek();
                if (frame instanceof ResetFrame rf) {
                    inputQ.remove();
                    if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
                        // If END_STREAM is already received, complete the requestBodyCF successfully
                        // and stop sending any request data.
                        requestBodyCF.complete(null);
                    } else {
                        handleReset(rf, subscriber);
                    }
                    return;
                }
                DataFrame df = (DataFrame) frame;
                boolean finished = df.getFlag(DataFrame.END_STREAM);

                List<ByteBuffer> buffers = df.getData();
                List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
                int size = Utils.remaining(dsts, Integer.MAX_VALUE);
                if (size == 0 && finished) {
                    inputQ.remove();
                    // consumed will not be called
                    connection.releaseUnconsumed(df); // must update connection window
                    Log.logTrace("responseSubscriber.onComplete");
                    if (debug.on()) debug.log("incoming: onComplete");
                    connection.decrementStreamsCount(streamid);
                    subscriber.onComplete();
                    onCompleteCalled = true;
                    setEndStreamReceived();
                    return;
                } else if (userSubscription.tryDecrement()) {
                    inputQ.remove();
                    Log.logTrace("responseSubscriber.onNext {0}", size);
                    if (debug.on()) debug.log("incoming: onNext(%d)", size);
                    try {
                        subscriber.onNext(dsts);
                    } catch (Throwable t) {
                        // Data frames that have been added to the inputQ
                        // must be released using releaseUnconsumed() to
                        // account for the amount of unprocessed bytes
                        // tracked by the connection.windowUpdater.
                        connection.releaseUnconsumed(df);
                        throw t;
                    }
                    if (consumed(df)) {
                        Log.logTrace("responseSubscriber.onComplete");
                        if (debug.on()) debug.log("incoming: onComplete");
                        connection.decrementStreamsCount(streamid);
                        subscriber.onComplete();
                        onCompleteCalled = true;
                        setEndStreamReceived();
                        return;
                    }
                } else {
                    if (stopRequested) break;
                    return;
                }
            }
        } catch (Throwable throwable) {
            errorRef.compareAndSet(null, throwable);
        } finally {
            inputQLock.unlock();
            if (sched.isStopped()) drainInputQueue();
        }

        Throwable t = errorRef.get();
        if (t != null) {
            sched.stop();
            try {
                if (!onCompleteCalled) {
                    if (debug.on())
                        debug.log("calling subscriber.onError: %s", (Object) t);
                    subscriber.onError(t);
                } else {
                    if (debug.on())
                        debug.log("already completed: dropping error %s", (Object) t);
                }
            } catch (Throwable x) {
                Log.logError("Subscriber::onError threw exception: {0}", t);
            } finally {
                // cancelImpl will eventually call drainInputQueue();
                cancelImpl(t);
            }
        }
    }

    // Called from the scheduler schedule() loop,
    // or after resetting the stream.
    // Ensures that all received data frames are accounted for
    // in the connection window flow control if the scheduler
    // is stopped before all the data is consumed.
    // The inputQLock is used to prevent concurrently taking
    // from the queue.
    private void drainInputQueue() {
        Http2Frame frame;
        // will wait until schedule() has finished taking
        // from the queue, if needed.
        inputQLock.lock();
        try {
            while ((frame = inputQ.poll()) != null) {
                if (frame instanceof DataFrame df) {
                    // Data frames that have been added to the inputQ
                    // must be released using releaseUnconsumed() to
                    // account for the amount of unprocessed bytes
                    // tracked by the connection.windowUpdater.
                    connection.releaseUnconsumed(df);
                }
            }
        } finally {
            inputQLock.unlock();
        }
    }

    @Override
    void nullBody(HttpResponse<T> resp, Throwable t) {
        if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
        // We should have an END_STREAM data frame waiting in the inputQ.
        // We need a subscriber to force the scheduler to process it.
        assert pendingResponseSubscriber == null;
        pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
        sched.runOrSchedule();
    }

    // Callback invoked after the Response BodySubscriber has consumed the
    // buffers contained in a DataFrame.
    // Returns true if END_STREAM is reached, false otherwise.
    private boolean consumed(DataFrame df) {
        // RFC 7540 6.1:
        // The entire DATA frame payload is included in flow control,
        // including the Pad Length and Padding fields if present
        int len = df.payloadLength();
        boolean endStream = df.getFlag(DataFrame.END_STREAM);
        if (len == 0) return endStream;

        connection.windowUpdater.processed(len);
        if (!endStream) {
            streamWindowUpdater.processed(len);
        } else {
            // Don't send window update on a stream which is
            // closed or half closed.
            streamWindowUpdater.released(len);
        }

        // true: end of stream; false: more data coming
        return endStream;
    }

    @Override
    void expectContinueFailed(int rcode) {
        // Have to mark request as sent, due to no request body being sent in the
        // event of a 417 Expectation Failed or some other non 100 response code
        requestSent();
    }

    // This method is called by Http2Connection::decrementStreamCount in order
    // to make sure that the stream count is decremented only once for
    // a given stream.
    boolean deRegister() {
        return DEREGISTERED.compareAndSet(this, false, true);
    }

    @Override
    CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
                                       boolean returnConnectionToPool,
                                       Executor executor)
    {
        try {
            Log.logTrace("Reading body on stream {0}", streamid);
            debug.log("Getting BodySubscriber for: " + response);
            Http2StreamResponseSubscriber<T> bodySubscriber =
                    createResponseSubscriber(handler, new ResponseInfoImpl(response));
            CompletableFuture<T> cf = receiveData(bodySubscriber, executor);

            PushGroup<?> pg = exchange.getPushGroup();
            if (pg != null) {
                // if an error occurs make sure it is recorded in the PushGroup
                cf = cf.whenComplete((t, e) -> pg.pushError(e));
            }
            return cf;
        } catch (Throwable t) {
            // may be thrown by handler.apply
            cancelImpl(t);
            return MinimalFuture.failedFuture(t);
        }
    }

    @Override
    Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
        Http2StreamResponseSubscriber<T> subscriber =
                new Http2StreamResponseSubscriber<>(handler.apply(response));
        return subscriber;
    }

    // The Http2StreamResponseSubscriber is registered with the HttpClient
    // to ensure that it gets completed if the SelectorManager aborts due
    // to unexpected exceptions.
    private boolean registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
        return client().registerSubscriber(subscriber);
    }

    private boolean unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
        return client().unregisterSubscriber(subscriber);
    }

    @Override
    public String toString() {
        return "streamid: " + streamid;
    }

    private void receiveDataFrame(DataFrame df) {
        try {
            int len = df.payloadLength();
            if (len > 0) {
                // we return from here if the connection is being closed.
                if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
                // we return from here if the stream is being closed.
                if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
                    connection.releaseUnconsumed(df);
                    return;
                }
            }
           pushDataFrame(len, df);
        } finally {
            sched.runOrSchedule();
        }
    }

    // Ensures that no data frame is pushed on the inputQ
    // after the stream is closed.
    // Changes to the `closed` boolean are guarded by the
    // stateLock. Contention should be low as only one
    // thread at a time adds to the inputQ, and
    // we can only contend when closing the stream.
    // Note that this method can run concurrently with
    // methods holding the inputQLock: that is OK.
    // The inputQLock is there to ensure that methods
    // taking from the queue are not running concurrently
    // with each others, but concurrently adding at the
    // end of the queue while peeking/polling at the head
    // is OK.
    private void pushDataFrame(int len, DataFrame df) {
        boolean closed = false;
        stateLock.lock();
        try {
            if (!(closed = this.closed)) {
                inputQ.add(df);
            }
        } finally {
            stateLock.unlock();
        }
        if (closed && len > 0) connection.releaseUnconsumed(df);
    }

    /** Handles a RESET frame. RESET is always handled inline in the queue. */
    private void receiveResetFrame(ResetFrame frame) {
        inputQ.add(frame);
        sched.runOrSchedule();
    }

    /**
     * Records the first reason which was invoked when sending a ResetFrame
     * to the server in the streamState, and return the previous value
     * of the streamState. This is an atomic operation.
     * A possible use of this method would be to send a ResetFrame only
     * if no previous reset frame has been sent.
     * For instance: <pre>{@code
     *  if (markStream(ResetFrame.CANCEL) == 0) {
     *      connection.sendResetFrame(streamId, ResetFrame.CANCEL);
     *  }
     *  }</pre>
     * @param code the reason code as per HTTP/2 protocol
     * @return the previous value of the stream state.
     */
    int  markStream(int code) {
        if (code == 0) return streamState;
        sendLock.lock();
        try {
            return (int) STREAM_STATE.compareAndExchange(this, 0, code);
        } finally {
            sendLock.unlock();
        }
    }

    private void sendDataFrame(DataFrame frame) {
        sendLock.lock();
         try {
             // must not send DataFrame after reset.
             if (streamState == 0) {
                connection.sendDataFrame(frame);
             }
        } finally {
             sendLock.unlock();
         }
    }

    // pushes entire response body into response subscriber
    // blocking when required by local or remote flow control
    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
        // ensure that the body subscriber will be subscribed and onError() is
        // invoked
        pendingResponseSubscriber = bodySubscriber;

        // We want to allow the subscriber's getBody() method to block so it
        // can work with InputStreams. So, we offload execution.
        responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
                new MinimalFuture<>(), this::cancelImpl);

        if (isCanceled()) {
            Throwable t = getCancelCause();
            responseBodyCF.completeExceptionally(t);
        }

        sched.runOrSchedule(); // in case data waiting already to be processed, or error

        return responseBodyCF;
    }

    @Override
    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
        return sendBodyImpl().thenApply( v -> this);
    }

    Stream(Http2Connection connection,
           Exchange<T> e,
           WindowController windowController)
    {
        super(e);
        this.connection = connection;
        this.windowController = windowController;
        this.request = e.request();
        this.requestPublisher = request.requestPublisher;  // may be null
        this.responseHeadersBuilder = new HttpHeadersBuilder();
        this.rspHeadersConsumer = new HeadersConsumer();
        this.requestPseudoHeaders = createPseudoHeaders(request);
        this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
    }

    private boolean checkRequestCancelled() {
        if (exchange.multi.requestCancelled()) {
            if (errorRef.get() == null) cancel();
            else sendResetStreamFrame(ResetFrame.CANCEL);
            return true;
        }
        return false;
    }

    /**
     * Entry point from Http2Connection reader thread.
     *
     * Data frames will be removed by response body thread.
     */
    void incoming(Http2Frame frame) throws IOException {
        if (debug.on()) debug.log("incoming: %s", frame);
        var cancelled = checkRequestCancelled() || closed;
        if ((frame instanceof HeaderFrame hf)) {
            if (hf.endHeaders()) {
                Log.logTrace("handling response (streamid={0})", streamid);
                handleResponse(hf);
            }
            if (hf.getFlag(HeaderFrame.END_STREAM)) {
                endStreamSeen = true;
                if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
                receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
            }
        } else if (frame instanceof DataFrame df) {
            if (df.getFlag(DataFrame.END_STREAM)) endStreamSeen = true;
            if (cancelled) {
                if (debug.on()) {
                    debug.log("request cancelled or stream closed: dropping data frame");
                }
                // Data frames that have not been added to the inputQ
                // can be released using dropDataFrame
                connection.dropDataFrame(df);
            } else {
                receiveDataFrame(df);
            }
        } else {
            if (!cancelled) otherFrame(frame);
        }
    }

    void otherFrame(Http2Frame frame) throws IOException {
        switch (frame.type()) {
            case WindowUpdateFrame.TYPE ->  incoming_windowUpdate((WindowUpdateFrame) frame);
            case ResetFrame.TYPE        ->  incoming_reset((ResetFrame) frame);
            case PriorityFrame.TYPE     ->  incoming_priority((PriorityFrame) frame);

            default -> throw new IOException("Unexpected frame: " + frame);
        }
    }

    // The Hpack decoder decodes into one of these consumers of name,value pairs

    DecodingCallback rspHeadersConsumer() {
        return rspHeadersConsumer;
    }

    String checkInterimResponseCountExceeded() {
        // this is also checked by Exchange - but tracking it here too provides
        // a more informative message.
        int count = nonFinalResponseCount.incrementAndGet();
        if (MAX_NON_FINAL_RESPONSES > 0 && (count < 0 || count > MAX_NON_FINAL_RESPONSES)) {
            return String.format(
                    "Stream %s PROTOCOL_ERROR: too many interim responses received: %s > %s",
                    streamid, count, MAX_NON_FINAL_RESPONSES);
        }
        return null;
    }

    protected void handleResponse(HeaderFrame hf) throws IOException {
        HttpHeaders responseHeaders = responseHeadersBuilder.build();

        if (!finalResponseCodeReceived) {
            try {
                responseCode = (int) responseHeaders
                        .firstValueAsLong(":status")
                        .orElseThrow(() -> new ProtocolException(String.format(
                                "Stream %s PROTOCOL_ERROR: no status code in response",
                                streamid)));
            } catch (ProtocolException cause) {
                cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
                rspHeadersConsumer.reset();
                return;
            }

            String protocolErrorMsg = null;
            // If informational code, response is partially complete
            if (responseCode < 100 || responseCode > 199) {
                this.finalResponseCodeReceived = true;
            } else if (hf.getFlag(HeaderFrame.END_STREAM)) {
                // see RFC 9113 section 8.1:
                // A HEADERS frame with the END_STREAM flag set that carries an
                // informational status code is malformed
                protocolErrorMsg = String.format(
                        "Stream %s PROTOCOL_ERROR: " +
                        "HEADERS frame with status %s has END_STREAM flag set",
                        streamid, responseCode);
            } else {
                protocolErrorMsg = checkInterimResponseCountExceeded();
            }

            if (protocolErrorMsg != null) {
                if (debug.on()) {
                    debug.log(protocolErrorMsg);
                }
                cancelImpl(new ProtocolException(protocolErrorMsg), ResetFrame.PROTOCOL_ERROR);
                rspHeadersConsumer.reset();
                return;
            }

            response = new Response(
                    request, exchange, responseHeaders, connection(),
                    responseCode, HttpClient.Version.HTTP_2);

            /* TODO: review if needs to be removed
               the value is not used, but in case `content-length` doesn't parse as
               long, there will be NumberFormatException. If left as is, make sure
               code up the stack handles NFE correctly. */
            responseHeaders.firstValueAsLong("content-length");

            if (Log.headers()) {
                StringBuilder sb = new StringBuilder("RESPONSE HEADERS (streamid=%s):\n".formatted(streamid));
                sb.append("  %s %s %s\n".formatted(request.method(), request.uri(), responseCode));
                Log.dumpHeaders(sb, "    ", responseHeaders);
                Log.logHeaders(sb.toString());
            }

            // this will clear the response headers
            rspHeadersConsumer.reset();

            completeResponse(response);
        } else {
            if (Log.headers()) {
                StringBuilder sb = new StringBuilder("TRAILING HEADERS (streamid=%s):\n".formatted(streamid));
                Log.dumpHeaders(sb, "    ", responseHeaders);
                Log.logHeaders(sb.toString());
            }
            if (trailerReceived) {
                String protocolErrorMsg = String.format(
                        "Stream %s PROTOCOL_ERROR: trailers already received", streamid);
                if (debug.on()) {
                    debug.log(protocolErrorMsg);
                }
                cancelImpl(new ProtocolException(protocolErrorMsg), ResetFrame.PROTOCOL_ERROR);
            }
            trailerReceived = true;
            rspHeadersConsumer.reset();
        }

    }

    void incoming_reset(ResetFrame frame) {
        Log.logTrace("Received RST_STREAM on stream {0}", streamid);
        // responseSubscriber will be null if readBodyAsync has not yet been called
        Flow.Subscriber<?> subscriber = responseSubscriber;
        if (subscriber == null) subscriber = pendingResponseSubscriber;
        // See RFC 9113 sec 5.1 Figure 2, life-cycle of a stream
        if (endStreamReceived() && requestBodyCF.isDone()) {
            // Stream is in a half closed or fully closed state, the RST_STREAM is ignored and logged.
            Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
        } else if (closed) {
            // Stream is in a fully closed state, the RST_STREAM is ignored and logged.
            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
        } else if (subscriber == null && !endStreamSeen) {
            // subscriber is null and the reader has not seen an END_STREAM flag, handle reset immediately
            handleReset(frame, null);
        } else if (!requestBodyCF.isDone()) {
            // Not done sending the body, complete exceptionally or normally based on RST_STREAM error code
            incompleteRequestBodyReset(frame, subscriber);
        } else if (response == null || !finalResponseCodeReceived) {
            // Complete response has not been received, handle reset immediately
            handleReset(frame, null);
        } else {
            // Put ResetFrame into inputQ. Any frames already in the queue will be processed before the ResetFrame.
            receiveResetFrame(frame);
            Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
        }
    }

    void incompleteRequestBodyReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
        if (frame.getErrorCode() != ResetFrame.NO_ERROR) {
            if (debug.on()) {
                debug.log("completing requestBodyCF exceptionally due to received" +
                        " RESET(%s) (stream=%s)", frame.getErrorCode(), streamid);
            }
            var exception = new IOException("RST_STREAM received " +
                    ResetFrame.stringForCode(frame.getErrorCode()));
            requestBodyCF.completeExceptionally(exception);
            cancelImpl(exception, frame.getErrorCode());
        } else {
            if (debug.on()) {
                debug.log("completing requestBodyCF normally due to received" +
                        " RESET(NO_ERROR) (stream=%s)", streamid);
            }
            if (!endStreamSeen || !finalResponseCodeReceived) {
                // If no END_STREAM flag seen or the final response code has not been received, any RST_STREAM
                // should be handled here immediately
                handleReset(frame, subscriber);
            } else {
                requestBodyCF.complete(null);
            }
        }
    }

    void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
        Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
        if (!closed) {
            stateLock.lock();
            try {
                if (closed) {
                    if (debug.on()) debug.log("Stream already closed: ignoring RESET");
                    return;
                }
                closed = true;
            } finally {
                stateLock.unlock();
            }
            try {
                final int error = frame.getErrorCode();
                // A REFUSED_STREAM error code implies that the stream wasn't processed by the
                // peer and the client is free to retry the request afresh.
                if (error == ErrorFrame.REFUSED_STREAM) {
                    // Here we arrange for the request to be retried. Note that we don't call
                    // closeAsUnprocessed() method here because the "closed" state is already set
                    // to true a few lines above and calling close() from within
                    // closeAsUnprocessed() will end up being a no-op. We instead do the additional
                    // bookkeeping here.
                    markUnprocessedByPeer();
                    errorRef.compareAndSet(null, new IOException("request not processed by peer"));
                    if (debug.on()) {
                        debug.log("request unprocessed by peer (REFUSED_STREAM) " + this.request);
                    }
                } else {
                    final String reason = ErrorFrame.stringForCode(error);
                    final IOException failureCause = new IOException("Received RST_STREAM: " + reason);
                    if (debug.on()) {
                        debug.log(streamid + " received RST_STREAM with code: " + reason);
                    }
                    if (errorRef.compareAndSet(null, failureCause)) {
                        if (subscriber != null) {
                            subscriber.onError(failureCause);
                        }
                    }
                }
                final Throwable failureCause = errorRef.get();
                completeResponseExceptionally(failureCause);
                if (!requestBodyCF.isDone()) {
                    requestBodyCF.completeExceptionally(failureCause); // we may be sending the body..
                }
                if (responseBodyCF != null) {
                    responseBodyCF.completeExceptionally(failureCause);
                }
            } finally {
                connection.decrementStreamsCount(streamid);
                connection.closeStream(streamid);
            }
        } else {
            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
        }
    }

    void incoming_priority(PriorityFrame frame) {
        // TODO: implement priority
        throw new UnsupportedOperationException("Not implemented");
    }

    private void incoming_windowUpdate(WindowUpdateFrame frame)
        throws IOException
    {
        int amount = frame.getUpdate();
        if (amount <= 0) {
            Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
                         streamid, amount);
            connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
        } else {
            assert streamid != 0;
            boolean success = windowController.increaseStreamWindow(amount, streamid);
            if (!success) {  // overflow
                connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
            }
        }
    }

    void incoming_pushPromise(HttpRequestImpl pushRequest,
                              PushedStream<T> pushStream)
        throws IOException
    {
        if (Log.requests()) {
            Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
        }
        PushGroup<T> pushGroup = exchange.getPushGroup();
        if (pushGroup == null || exchange.multi.requestCancelled()) {
            Log.logTrace("Rejecting push promise stream " + streamid);
            connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
            pushStream.close();
            return;
        }

        PushGroup.Acceptor<T> acceptor = null;
        boolean accepted = false;
        try {
            acceptor = pushGroup.acceptPushRequest(pushRequest);
            accepted = acceptor.accepted();
        } catch (Throwable t) {
            if (debug.on())
                debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
                          (Object)t);
        }
        if (!accepted) {
            // cancel / reject
            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
            if (Log.trace()) {
                Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
                        ex.getMessage());
            }
            pushStream.cancelImpl(ex);
            return;
        }

        assert accepted && acceptor != null;
        CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
        HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
        assert pushHandler != null;

        pushStream.requestSent();
        pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
        // setup housekeeping for when the push is received
        // TODO: deal with ignoring of CF anti-pattern
        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
        cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
            t = Utils.getCompletionCause(t);
            if (Log.trace()) {
                Log.logTrace("Push completed on stream {0} for {1}{2}",
                             pushStream.streamid, resp,
                             ((t==null) ? "": " with exception " + t));
            }
            if (t != null) {
                pushGroup.pushError(t);
                pushResponseCF.completeExceptionally(t);
            } else {
                pushResponseCF.complete(resp);
            }
            pushGroup.pushCompleted();
        });

    }

    private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
        HttpHeadersBuilder h = request.getSystemHeadersBuilder();
        if (contentLength > 0) {
            h.setHeader("content-length", Long.toString(contentLength));
        }
        HttpHeaders sysh = filterHeaders(h.build());
        HttpHeaders userh = filterHeaders(request.getUserHeaders());
        // Filter context restricted from userHeaders
        userh = HttpHeaders.of(userh.map(), Utils.ACCEPT_ALL);
        Utils.setUserAuthFlags(request, userh);

        // Don't override Cookie values that have been set by the CookieHandler.
        final HttpHeaders uh = userh;
        BiPredicate<String, String> overrides =
                (k, v) -> COOKIE_HEADER.equalsIgnoreCase(k)
                          || uh.firstValue(k).isEmpty();

        // Filter any headers from systemHeaders that are set in userHeaders
        //   except for "Cookie:" - user cookies will be appended to system
        //   cookies
        sysh = HttpHeaders.of(sysh.map(), overrides);

        OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
        if (contentLength == 0) {
            f.setFlag(HeadersFrame.END_STREAM);
            endStreamSent = true;
        }
        return f;
    }

    private boolean hasProxyAuthorization(HttpHeaders headers) {
        return headers.firstValue("proxy-authorization")
                      .isPresent();
    }

    // Determines whether we need to build a new HttpHeader object.
    //
    // Ideally we should pass the filter to OutgoingHeaders refactor the
    // code that creates the HeaderFrame to honor the filter.
    // We're not there yet - so depending on the filter we need to
    // apply and the content of the header we will try to determine
    //  whether anything might need to be filtered.
    // If nothing needs filtering then we can just use the
    // original headers.
    private boolean needsFiltering(HttpHeaders headers,
                                   BiPredicate<String, String> filter) {
        if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
            // we're either connecting or proxying
            // slight optimization: we only need to filter out
            // disabled schemes, so if there are none just
            // pass through.
            return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
                    && hasProxyAuthorization(headers);
        } else {
            // we're talking to a server, either directly or through
            // a tunnel.
            // Slight optimization: we only need to filter out
            // proxy authorization headers, so if there are none just
            // pass through.
            return hasProxyAuthorization(headers);
        }
    }

    private HttpHeaders filterHeaders(HttpHeaders headers) {
        HttpConnection conn = connection();
        BiPredicate<String, String> filter = conn.headerFilter(request);
        if (needsFiltering(headers, filter)) {
            return HttpHeaders.of(headers.map(), filter);
        }
        return headers;
    }

    private static HttpHeaders createPseudoHeaders(HttpRequest request) {
        HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
        String method = request.method();
        hdrs.setHeader(":method", method);
        URI uri = request.uri();
        hdrs.setHeader(":scheme", uri.getScheme());
        String host = uri.getHost();
        int port = uri.getPort();
        assert host != null;
        if (port != -1) {
            hdrs.setHeader(":authority", host + ":" + port);
        } else {
            hdrs.setHeader(":authority", host);
        }
        String query = uri.getRawQuery();
        String path = uri.getRawPath();
        if (path == null || path.isEmpty()) {
            if (method.equalsIgnoreCase("OPTIONS")) {
                path = "*";
            } else {
                path = "/";
            }
        }
        if (query != null) {
            path += "?" + query;
        }
        hdrs.setHeader(":path", Utils.encode(path));
        return hdrs.build();
    }

    HttpHeaders getRequestPseudoHeaders() {
        return requestPseudoHeaders;
    }

    /** Sets endStreamReceived. Should be called only once. */
    void setEndStreamReceived() {
        if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);
        assert remotelyClosed == false: "Unexpected endStream already set";
        remotelyClosed = true;
        responseReceived();
    }

    /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
     *  received on this stream. */
    private boolean endStreamReceived() {
        return remotelyClosed;
    }

    @Override
    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
        if (debug.on()) debug.log("sendHeadersOnly()");
        if (Log.requests() && request != null) {
            Log.logRequest(request.toString());
        }
        if (requestPublisher != null) {
            requestContentLen = requestPublisher.contentLength();
        } else {
            requestContentLen = 0;
        }

        // At this point the stream doesn't have a streamid yet.
        // It will be allocated if we send the request headers.
        Throwable t = errorRef.get();
        if (t != null) {
            if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
            return MinimalFuture.failedFuture(t);
        }

        // sending the headers will cause the allocation of the stream id
        OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
        connection.sendFrame(f);
        CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
        cf.complete(this);  // #### good enough for now
        return cf;
    }

    @Override
    void released() {
        if (streamid > 0) {
            if (debug.on()) debug.log("Released stream %d", streamid);
            // remove this stream from the Http2Connection map.
            connection.decrementStreamsCount(streamid);
            connection.closeStream(streamid);
        } else {
            if (debug.on()) debug.log("Can't release stream %d", streamid);
        }
    }

    @Override
    void completed() {
        // There should be nothing to do here: the stream should have
        // been already closed (or will be closed shortly after).
    }

    boolean registerStream(int id, boolean registerIfCancelled) {
        boolean cancelled = closed || exchange.multi.requestCancelled();
        if (!cancelled || registerIfCancelled) {
            this.streamid = id;
            connection.putStream(this, streamid);
            if (debug.on()) {
                debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
                        streamid, cancelled, registerIfCancelled);
            }
        }
        return !cancelled;
    }

    void signalWindowUpdate() {
        RequestSubscriber subscriber = requestSubscriber;
        assert subscriber != null;
        if (debug.on()) debug.log("Signalling window update");
        subscriber.sendScheduler.runOrSchedule();
    }

    static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
    class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
        // can be < 0 if the actual length is not known.
        private final long contentLength;
        private volatile long remainingContentLength;
        private volatile Subscription subscription;

        // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
        //  1) The data that was published by the request body Publisher, and
        //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
        final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();

        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
        // A scheduler used to honor window updates. Writing must be paused
        // when the window is exhausted, and resumed when the window acquires
        // some space. The sendScheduler makes it possible to implement this
        // behaviour in an asynchronous non-blocking way.
        // See RequestSubscriber::trySend below.
        final SequentialScheduler sendScheduler;

        RequestSubscriber(long contentLen) {
            this.contentLength = contentLen;
            this.remainingContentLength = contentLen;
            this.sendScheduler =
                    SequentialScheduler.lockingScheduler(this::trySend);
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (this.subscription != null) {
                throw new IllegalStateException("already subscribed");
            }
            this.subscription = subscription;
            if (debug.on())
                debug.log("RequestSubscriber: onSubscribe, request 1");
            subscription.request(1);
        }

        @Override
        public void onNext(ByteBuffer item) {
            if (debug.on())
                debug.log("RequestSubscriber: onNext(%d)", item.remaining());
            int size = outgoing.size();
            assert size == 0 : "non-zero size: " + size;
            onNextImpl(item);
        }

        private void onNextImpl(ByteBuffer item) {
            // Got some more request body bytes to send.
            if (requestBodyCF.isDone()) {
                if (debug.on()) {
                    debug.log("RequestSubscriber: requestBodyCf is done: " +
                            "cancelling subscription");
                }
                // stream already cancelled, probably in timeout
                sendScheduler.stop();
                subscription.cancel();
                return;
            }
            outgoing.add(item);
            sendScheduler.runOrSchedule();
        }

        @Override
        public void onError(Throwable throwable) {
            if (debug.on())
                debug.log(() -> "RequestSubscriber: onError: " + throwable);
            // ensure that errors are handled within the flow.
            if (errorRef.compareAndSet(null, throwable)) {
                sendScheduler.runOrSchedule();
            }
        }

        @Override
        public void onComplete() {
            if (debug.on()) debug.log("RequestSubscriber: onComplete");
            int size = outgoing.size();
            assert size == 0 || size == 1 : "non-zero or one size: " + size;
            // last byte of request body has been obtained.
            // ensure that everything is completed within the flow.
            onNextImpl(COMPLETED);
        }

        // Attempts to send the data, if any.
        // Handles errors and completion state.
        // Pause writing if the send window is exhausted, resume it if the
        // send window has some bytes that can be acquired.
        void trySend() {
            try {
                // handle errors raised by onError;
                Throwable t = errorRef.get();
                if (t != null) {
                    sendScheduler.stop();
                    if (requestBodyCF.isDone()) return;
                    subscription.cancel();
                    requestBodyCF.completeExceptionally(t);
                    cancelImpl(t);
                    return;
                }
                int state = streamState;

                do {
                    // handle COMPLETED;
                    ByteBuffer item = outgoing.peekFirst();
                    if (item == null) return;
                    else if (item == COMPLETED) {
                        sendScheduler.stop();
                        complete();
                        return;
                    }

                    // handle bytes to send downstream
                    while (item.hasRemaining() && state == 0) {
                        if (debug.on()) debug.log("trySend: %d", item.remaining());
                        DataFrame df = getDataFrame(item);
                        if (df == null) {
                            if (debug.on())
                                debug.log("trySend: can't send yet: %d", item.remaining());
                            return; // the send window is exhausted: come back later
                        }

                        if (contentLength > 0) {
                            remainingContentLength -= df.getDataLength();
                            if (remainingContentLength < 0) {
                                String msg = connection().getConnectionFlow()
                                        + " stream=" + streamid + " "
                                        + "[" + Thread.currentThread().getName() + "] "
                                        + "Too many bytes in request body. Expected: "
                                        + contentLength + ", got: "
                                        + (contentLength - remainingContentLength);
                                assert streamid > 0;
                                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
                                throw new IOException(msg);
                            } else if (remainingContentLength == 0) {
                                assert !endStreamSent : "internal error, send data after END_STREAM flag";
                                df.setFlag(DataFrame.END_STREAM);
                                endStreamSent = true;
                            }
                        } else {
                            assert !endStreamSent : "internal error, send data after END_STREAM flag";
                        }
                        if ((state = streamState) != 0) {
                            if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
                            break;
                        }
                        if (debug.on())
                            debug.log("trySend: sending: %d", df.getDataLength());
                        sendDataFrame(df);
                    }
                    if (state != 0) break;
                    assert !item.hasRemaining();
                    ByteBuffer b = outgoing.removeFirst();
                    assert b == item;
                } while (outgoing.peekFirst() != null);

                if (state != 0) {
                    t = errorRef.get();
                    if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
                    throw t;
                }

                if (debug.on()) debug.log("trySend: request 1");
                subscription.request(1);
            } catch (Throwable ex) {
                if (debug.on()) debug.log("trySend: ", ex);
                sendScheduler.stop();
                subscription.cancel();
                requestBodyCF.completeExceptionally(ex);
                // need to cancel the stream to 1. tell the server
                // we don't want to receive any more data and
                // 2. ensure that the operation ref count will be
                // decremented on the HttpClient.
                cancelImpl(ex);
            }
        }

        private void complete() throws IOException {
            long remaining = remainingContentLength;
            long written = contentLength - remaining;
            if (remaining > 0) {
                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
                // let trySend() handle the exception
                throw new IOException(connection().getConnectionFlow()
                                     + " stream=" + streamid + " "
                                     + "[" + Thread.currentThread().getName() +"] "
                                     + "Too few bytes returned by the publisher ("
                                              + written + "/"
                                              + contentLength + ")");
            }
            if (!endStreamSent) {
                endStreamSent = true;
                connection.sendDataFrame(getEmptyEndStreamDataFrame());
            }
            requestBodyCF.complete(null);
        }
    }

    /**
     * Send a RESET frame to tell server to stop sending data on this stream
     */
    @Override
    public CompletableFuture<Void> ignoreBody() {
        try {
            connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
            return MinimalFuture.completedFuture(null);
        } catch (Throwable e) {
            Log.logTrace("Error resetting stream {0}", e.toString());
            return MinimalFuture.failedFuture(e);
        }
    }

    DataFrame getDataFrame(ByteBuffer buffer) {
        int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
        // blocks waiting for stream send window, if exhausted
        int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
        if (actualAmount <= 0) return null;
        ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
        DataFrame df = new DataFrame(streamid, 0 , outBuf);
        return df;
    }

    private DataFrame getEmptyEndStreamDataFrame()  {
        return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
    }

    /**
     * A List of responses relating to this stream. Normally there is only
     * one response, but interim responses like 100 are allowed
     * and must be passed up to higher level before continuing. Deals with races
     * such as if responses are returned before the CFs get created by
     * getResponseAsync()
     */

    final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
    final Lock response_cfs_lock = new ReentrantLock();

    @Override
    CompletableFuture<Response> getResponseAsync(Executor executor) {
        CompletableFuture<Response> cf;
        // The code below deals with race condition that can be caused when
        // completeResponse() is being called before getResponseAsync()
        response_cfs_lock.lock();
        try {
            if (!response_cfs.isEmpty()) {
                // This CompletableFuture was created by completeResponse().
                // it will be already completed, unless the expect continue
                // timeout fired
                cf = response_cfs.get(0);
                if (cf.isDone()) {
                    cf = response_cfs.remove(0);
                }

                // if we find a cf here it should be already completed.
                // finding a non completed cf should not happen. just assert it.
                assert cf.isDone() || request.expectContinue && expectTimeoutRaised()
                        : "Removing uncompleted response: could cause code to hang!";
            } else {
                // getResponseAsync() is called first. Create a CompletableFuture
                // that will be completed by completeResponse() when
                // completeResponse() is called.
                cf = new MinimalFuture<>();
                response_cfs.add(cf);
            }
        } finally {
            response_cfs_lock.unlock();
        }
        if (executor != null && !cf.isDone()) {
            // protect from executing later chain of CompletableFuture operations from SelectorManager thread
            cf = cf.thenApplyAsync(r -> r, executor);
        }
        Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
        PushGroup<?> pg = exchange.getPushGroup();
        if (pg != null) {
            // if an error occurs make sure it is recorded in the PushGroup
            cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
        }
        return cf;
    }

    /**
     * Completes the first uncompleted CF on list, and removes it. If there is no
     * uncompleted CF then creates one (completes it) and adds to list
     */
    void completeResponse(Response resp) {
        response_cfs_lock.lock();
        try {
            CompletableFuture<Response> cf;
            int cfs_len = response_cfs.size();
            for (int i=0; i<cfs_len; i++) {
                cf = response_cfs.get(i);
                if (!cf.isDone() && !expectTimeoutRaised()) {
                    Log.logTrace("Completing response (streamid={0}): {1}",
                                 streamid, cf);
                    if (debug.on())
                        debug.log("Completing responseCF(%d) with response headers", i);
                    response_cfs.remove(cf);
                    cf.complete(resp);
                    return;
                } else if (expectTimeoutRaised()) {
                    Log.logTrace("Completing response (streamid={0}): {1}",
                            streamid, cf);
                    if (debug.on())
                        debug.log("Completing responseCF(%d) with response headers", i);
                    // The Request will be removed in getResponseAsync()
                    cf.complete(resp);
                    return;
                } // else we found the previous response: just leave it alone.
            }
            cf = MinimalFuture.completedFuture(resp);
            Log.logTrace("Created completed future (streamid={0}): {1}",
                         streamid, cf);
            if (debug.on())
                debug.log("Adding completed responseCF(0) with response headers");
            response_cfs.add(cf);
        } finally {
            response_cfs_lock.unlock();
        }
    }

    // methods to update state and remove stream when finished

    void requestSent() {
        stateLock.lock();
        try {
            requestSent = true;
            if (responseReceived) {
                if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
                close();
            } else {
                if (debug.on()) {
                    debug.log("requestSent: streamid=%d but response not received", streamid);
                }
            }
        } finally {
            stateLock.unlock();
        }
    }

    void responseReceived() {
        stateLock.lock();
        try {
            responseReceived = true;
            if (requestSent) {
                if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
                close();
            } else {
                if (debug.on()) {
                    debug.log("responseReceived: streamid=%d but request not sent", streamid);
                }
            }
        } finally {
            stateLock.unlock();
        }
    }

    /**
     * same as above but for errors
     */
    void completeResponseExceptionally(Throwable t) {
        response_cfs_lock.lock();
        try {
            // use index to avoid ConcurrentModificationException
            // caused by removing the CF from within the loop.
            for (int i = 0; i < response_cfs.size(); i++) {
                CompletableFuture<Response> cf = response_cfs.get(i);
                if (!cf.isDone()) {
                    response_cfs.remove(i);
                    cf.completeExceptionally(t);
                    return;
                }
            }
            response_cfs.add(MinimalFuture.failedFuture(t));
        } finally {
            response_cfs_lock.unlock();
        }
    }

    CompletableFuture<Void> sendBodyImpl() {
        requestBodyCF.whenComplete((v, t) -> requestSent());
        try {
            if (requestPublisher != null) {
                final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
                requestPublisher.subscribe(requestSubscriber = subscriber);
            } else {
                // there is no request body, therefore the request is complete,
                // END_STREAM has already sent with outgoing headers
                requestBodyCF.complete(null);
            }
        } catch (Throwable t) {
            cancelImpl(t);
            requestBodyCF.completeExceptionally(t);
        }
        return requestBodyCF;
    }

    @Override
    void cancel() {
        if ((streamid == 0)) {
            cancel(new IOException("Stream cancelled before streamid assigned"));
        } else {
            cancel(new IOException("Stream " + streamid + " cancelled"));
        }
    }

    void onSubscriptionError(Throwable t) {
        errorRef.compareAndSet(null, t);
        if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
        // This is the special case where the subscriber
        // has requested an illegal number of items.
        // In this case, the error doesn't come from
        // upstream, but from downstream, and we need to
        // handle the error without waiting for the inputQ
        // to be exhausted.
        stopRequested = true;
        sched.runOrSchedule();
    }

    @Override
    void cancel(IOException cause) {
        cancelImpl(cause);
    }

    @Override
    void onProtocolError(final IOException cause) {
        onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
    }

    void onProtocolError(final IOException cause, int code) {
        if (debug.on()) {
            debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
                    streamid, ErrorFrame.stringForCode(code),
                    cause.getMessage());
        }
        Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
        // send a RESET frame and close the stream
        cancelImpl(cause, code);
    }

    void connectionClosing(Throwable cause) {
        Flow.Subscriber<?> subscriber =
                responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
        errorRef.compareAndSet(null, cause);
        if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
            sched.runOrSchedule();
        } else cancelImpl(cause);
    }

    // This method sends a RST_STREAM frame
    void cancelImpl(Throwable e) {
        cancelImpl(e, ResetFrame.CANCEL);
    }

    void cancelImpl(final Throwable e, final int resetFrameErrCode) {
        errorRef.compareAndSet(null, e);
        if (debug.on()) {
            if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
            else debug.log("cancelling stream %d: %s", streamid, e);
        }
        if (Log.trace()) {
            if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);
            else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
        }
        boolean closing;
        if (closing = !closed) { // assigning closing to !closed
            stateLock.lock();
            try {
                if (closing = !closed) { // assigning closing to !closed
                    closed=true;
                }
            } finally {
                stateLock.unlock();
            }
        }

        if (closing) { // true if the stream has not been closed yet
            var subscriber = this.responseSubscriber;
            if (subscriber == null) subscriber = this.pendingResponseSubscriber;
            if (subscriber != null) {
                if (debug.on())
                    debug.log("stream %s closing due to %s", streamid, (Object)errorRef.get());
                sched.runOrSchedule();
                if (subscriber instanceof Http2StreamResponseSubscriber<?> rs) {
                    // make sure the subscriber is stopped.
                    if (debug.on()) debug.log("closing response subscriber stream %s", streamid);
                    rs.complete(errorRef.get());
                }
            } else {
                if (debug.on())
                    debug.log("stream %s closing due to %s before subscriber registered",
                            streamid, (Object)errorRef.get());
            }
        } else {
            if (debug.on()) {
                debug.log("stream %s already closed due to %s",
                        streamid, (Object)errorRef.get());
            }
        }

        completeResponseExceptionally(e);
        if (!requestBodyCF.isDone()) {
            requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
        }
        if (responseBodyCF != null) {
            responseBodyCF.completeExceptionally(errorRef.get());
        }
        try {
            // will send a RST_STREAM frame
            if (streamid != 0 && streamState == 0) {
                final Throwable cause = Utils.getCompletionCause(e);
                if (cause instanceof EOFException) {
                    // read EOF: no need to try & send reset
                    connection.decrementStreamsCount(streamid);
                    connection.closeStream(streamid);
                } else {
                    // no use to send CANCEL if already closed.
                    sendResetStreamFrame(resetFrameErrCode);
                }
            }
        } catch (Throwable ex) {
            Log.logError(ex);
        } finally {
            drainInputQueue();
        }
    }

    void sendResetStreamFrame(final int resetFrameErrCode) {
        // do not reset a stream until it has a streamid.
        if (streamid > 0 && markStream(resetFrameErrCode) == 0) {
            connection.resetStream(streamid, resetFrameErrCode);
        }
        close();
    }

    // This method doesn't send any frame
    void close() {
        if (closed) return;
        stateLock.lock();
        try {
            if (closed) return;
            closed = true;
        } finally {
            stateLock.unlock();
        }
        if (debug.on()) debug.log("close stream %d", streamid);
        Log.logTrace("Closing stream {0}", streamid);
        connection.closeStream(streamid);
        var s = responseSubscriber == null
                ? pendingResponseSubscriber
                : responseSubscriber;
        if (debug.on()) debug.log("subscriber is %s", s);
        if (s instanceof Http2StreamResponseSubscriber<?> sw) {
            if (debug.on()) debug.log("closing response subscriber stream %s", streamid);
            // if the subscriber has already completed,
            // there is nothing to do...
            if (!sw.completed()) {
                // otherwise make sure it will be completed
                var cause = errorRef.get();
                sw.complete(cause == null ? new IOException("stream closed") : cause);
            }
        }
        Log.logTrace("Stream {0} closed", streamid);
    }

    static class PushedStream<T> extends Stream<T> {
        final Stream<T> parent;
        final PushGroup<T> pushGroup;
        // push streams need the response CF allocated up front as it is
        // given directly to user via the multi handler callback function.
        final CompletableFuture<Response> pushCF;
        CompletableFuture<HttpResponse<T>> responseCF;
        final HttpRequestImpl pushReq;
        volatile HttpResponse.BodyHandler<T> pushHandler;
        private volatile boolean finalPushResponseCodeReceived;

        PushedStream(Stream<T> parent,
                     PushGroup<T> pushGroup,
                     Http2Connection connection,
                     Exchange<T> pushReq) {
            // ## no request body possible, null window controller
            super(connection, pushReq, null);
            this.parent = parent;
            this.pushGroup = pushGroup;
            this.pushReq = pushReq.request();
            this.pushCF = new MinimalFuture<>();
            this.responseCF = new MinimalFuture<>();
        }

        CompletableFuture<HttpResponse<T>> responseCF() {
            return responseCF;
        }

        void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
            this.pushHandler = pushHandler;
        }

        HttpResponse.BodyHandler<T> getPushHandler() {
            // ignored parameters to function can be used as BodyHandler
            return this.pushHandler;
        }

        // Following methods call the super class but in case of
        // error record it in the PushGroup. The error method is called
        // with a null value when no error occurred (is a no-op)
        @Override
        CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
            return super.sendBodyAsync()
                        .whenComplete((ExchangeImpl<T> v, Throwable t)
                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
        }

        @Override
        CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
            return super.sendHeadersAsync()
                        .whenComplete((ExchangeImpl<T> ex, Throwable t)
                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
        }

        @Override
        CompletableFuture<Response> getResponseAsync(Executor executor) {
            CompletableFuture<Response> cf = pushCF.whenComplete(
                    (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
            if(executor!=null && !cf.isDone()) {
                cf  = cf.thenApplyAsync( r -> r, executor);
            }
            return cf;
        }

        @Override
        CompletableFuture<T> readBodyAsync(
                HttpResponse.BodyHandler<T> handler,
                boolean returnConnectionToPool,
                Executor executor)
        {
            return super.readBodyAsync(handler, returnConnectionToPool, executor)
                        .whenComplete((v, t) -> pushGroup.pushError(t));
        }

        @Override
        void completeResponse(Response r) {
            Log.logResponse(r::toString);
            pushCF.complete(r); // not strictly required for push API
            // start reading the body using the obtained BodySubscriber
            CompletableFuture<Void> start = new MinimalFuture<>();
            start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
                .whenComplete((T body, Throwable t) -> {
                    if (t != null) {
                        responseCF.completeExceptionally(t);
                    } else {
                        HttpResponseImpl<T> resp =
                                new HttpResponseImpl<>(r.request, r, null, body, getExchange());
                        responseCF.complete(resp);
                    }
                });
            start.completeAsync(() -> null, getExchange().executor());
        }

        @Override
        void completeResponseExceptionally(Throwable t) {
            pushCF.completeExceptionally(t);
        }

        // create and return the PushResponseImpl
        @Override
        protected void handleResponse(HeaderFrame hf) {
            HttpHeaders responseHeaders = responseHeadersBuilder.build();

            if (!finalPushResponseCodeReceived) {
                responseCode = (int)responseHeaders
                    .firstValueAsLong(":status")
                    .orElse(-1);

                if (responseCode == -1) {
                    cancelImpl(new ProtocolException("No status code"), ResetFrame.PROTOCOL_ERROR);
                    rspHeadersConsumer.reset();
                    return;
                } else if (responseCode >= 100 && responseCode < 200) {
                    String protocolErrorMsg = checkInterimResponseCountExceeded();
                    if (protocolErrorMsg != null) {
                        cancelImpl(new ProtocolException(protocolErrorMsg), ResetFrame.PROTOCOL_ERROR);
                        rspHeadersConsumer.reset();
                        return;
                    }
                }

                this.finalPushResponseCodeReceived = true;

                this.response = new Response(
                        pushReq, exchange, responseHeaders, connection(),
                        responseCode, HttpClient.Version.HTTP_2);

                /* TODO: review if needs to be removed
                   the value is not used, but in case `content-length` doesn't parse
                   as long, there will be NumberFormatException. If left as is, make
                   sure code up the stack handles NFE correctly. */
                responseHeaders.firstValueAsLong("content-length");

                if (Log.headers()) {
                    StringBuilder sb = new StringBuilder("RESPONSE HEADERS (streamid=%s):\n".formatted(streamid));
                    sb.append("  %s %s %s\n".formatted(request.method(), request.uri(), responseCode));
                    Log.dumpHeaders(sb, "    ", responseHeaders);
                    Log.logHeaders(sb.toString());
                }

                rspHeadersConsumer.reset();

                // different implementations for normal streams and pushed streams
                completeResponse(response);
            } else {
                if (Log.headers()) {
                    StringBuilder sb = new StringBuilder("TRAILING HEADERS (streamid=%s):\n".formatted(streamid));
                    sb.append("  %s %s %s\n".formatted(request.method(), request.uri(), responseCode));
                    Log.dumpHeaders(sb, "    ", responseHeaders);
                    Log.logHeaders(sb.toString());
                }
                rspHeadersConsumer.reset();
            }
        }
    }

    final class StreamWindowUpdateSender extends WindowUpdateSender {

        StreamWindowUpdateSender(Http2Connection connection) {
            super(connection);
        }

        @Override
        int getStreamId() {
            return streamid;
        }

        @Override
        String dbgString() {
            String dbg = dbgString;
            if (dbg != null) return dbg;
            if (streamid == 0) {
                return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
            } else {
                dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
                return dbgString = dbg;
            }
        }

        @Override
        protected boolean windowSizeExceeded(long received) {
            onProtocolError(new ProtocolException("stream %s flow control window exceeded"
                        .formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
            return true;
        }
    }

    /**
     * Returns true if this exchange was canceled.
     * @return true if this exchange was canceled.
     */
     boolean isCanceled() {
        return errorRef.get() != null;
    }

    /**
     * Returns the cause for which this exchange was canceled, if available.
     * @return the cause for which this exchange was canceled, if available.
     */
    Throwable getCancelCause() {
        return errorRef.get();
    }

    final String dbgString() {
        final int id = streamid;
        final String sid = id == 0 ? "?" : String.valueOf(id);
        return connection.dbgString() + "/Stream(" + sid + ")";
    }

    /**
     * An unprocessed exchange is one that hasn't been processed by a peer. The local end of the
     * connection would be notified about such exchanges when it receives a GOAWAY frame with
     * a stream id that tells which exchanges have been unprocessed.
     * This method is called on such unprocessed exchanges and the implementation of this method
     * will arrange for the request, corresponding to this exchange, to be retried afresh on a
     * new connection.
     */
    void closeAsUnprocessed() {
        try {
            // We arrange for the request to be retried on a new connection as allowed by the RFC-9113
            markUnprocessedByPeer();
            this.errorRef.compareAndSet(null, new IOException("request not processed by peer"));
            if (debug.on()) {
                debug.log("closing " + this.request + " as unprocessed by peer");
            }
            // close the exchange and complete the response CF exceptionally
            close();
            completeResponseExceptionally(this.errorRef.get());
        } finally {
            // decrementStreamsCount isn't really needed but we do it to make sure
            // the log messages, where these counts/states get reported, show the accurate state.
            connection.decrementStreamsCount(streamid);
        }
    }

    private final class HeadersConsumer extends ValidatingHeadersConsumer
            implements DecodingCallback {

        private HeadersConsumer() {
            super(Context.RESPONSE);
        }

        boolean maxHeaderListSizeReached;

        @Override
        public void reset() {
            super.reset();
            responseHeadersBuilder.clear();
            debug.log("Response builder cleared, ready to receive new headers.");
        }

        @Override
        public void onDecoded(CharSequence name, CharSequence value)
            throws UncheckedIOException
        {
            if (maxHeaderListSizeReached) {
                return;
            }
            try {
                String n = name.toString();
                String v = value.toString();
                super.onDecoded(n, v);
                responseHeadersBuilder.addHeader(n, v);
                if (Log.headers() && Log.trace()) {
                    Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
                            streamid, n, v);
                }
            } catch (UncheckedIOException uio) {
                // reset stream: From RFC 9113, section 8.1
                // Malformed requests or responses that are detected MUST be
                // treated as a stream error (Section 5.4.2) of type
                // PROTOCOL_ERROR.
                onProtocolError(uio.getCause());
            }
        }

        @Override
        protected String formatMessage(String message, String header) {
            return "malformed response: " + super.formatMessage(message, header);
        }

        @Override
        public void onMaxHeaderListSizeReached(long size, int maxHeaderListSize) throws ProtocolException {
            if (maxHeaderListSizeReached) return;
            try {
                DecodingCallback.super.onMaxHeaderListSizeReached(size, maxHeaderListSize);
            } catch (ProtocolException cause) {
                maxHeaderListSizeReached = true;
                // If this is a push stream: cancel the parent.
                if (Stream.this instanceof Stream.PushedStream<?> ps) {
                    ps.parent.onProtocolError(cause);
                }
                // cancel the stream, continue processing
                onProtocolError(cause);
                reset();
            }
        }
    }

    final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U> {
        Http2StreamResponseSubscriber(BodySubscriber<U> subscriber) {
            super(subscriber);
        }

        @Override
        protected void register() {
            registerResponseSubscriber(this);
        }

        @Override
        protected void unregister() {
            unregisterResponseSubscriber(this);
        }

    }

    private static final VarHandle STREAM_STATE;
    private static final VarHandle DEREGISTERED;
    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            STREAM_STATE = lookup
                    .findVarHandle(Stream.class, "streamState", int.class);
            DEREGISTERED = lookup
                    .findVarHandle(Stream.class, "deRegistered", boolean.class);
        } catch (Exception x) {
            throw new ExceptionInInitializerError(x);
        }
    }
}
