diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java index 196f7a1d98..7e997a0cbc 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java @@ -31,6 +31,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import org.apache.hc.core5.annotation.Contract; @@ -62,6 +65,14 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer trailers = Collections.synchronizedList(new ArrayList<>()); private final BasicFuture>> responseFuture; + private final CompletableFuture>> responseCompletableFuture; + private final CompletableFuture responseCompletionFuture; + + /** + * Completes with {@code null} on success and with the terminal {@link Throwable} on failure. + * This future never completes exceptionally. + */ + private final CompletableFuture failureFuture; private volatile BasicFuture responseCompletion; private volatile HttpResponse informationResponse; @@ -72,6 +83,9 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer(null); + this.responseCompletableFuture = new CompletableFuture<>(); + this.responseCompletionFuture = new CompletableFuture<>(); + this.failureFuture = new CompletableFuture<>(); } /** @@ -82,12 +96,65 @@ public ReactiveResponseConsumer() { */ public ReactiveResponseConsumer(final FutureCallback>> responseCallback) { this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback")); + this.responseCompletableFuture = new CompletableFuture<>(); + this.responseCompletionFuture = new CompletableFuture<>(); + this.failureFuture = new CompletableFuture<>(); } public Future>> getResponseFuture() { return responseFuture; } + /** + * Returns a {@link CompletableFuture} that completes when the response head and body {@link Publisher} + * are available. + * + * @since 5.5 + */ + public CompletableFuture>> getResponseCompletableFuture() { + return responseCompletableFuture; + } + + /** + * Returns a {@link CompletableFuture} that completes when the response exchange is complete + * (end-of-stream reached and trailers processed, if any). + * + * @since 5.5 + */ + public CompletableFuture getResponseCompletionFuture() { + return responseCompletionFuture; + } + + /** + * Returns a {@link CompletionStage} that completes when the response head and body {@link Publisher} + * are available. + * + * @since 5.5 + */ + public CompletionStage>> getResponseStage() { + return responseCompletableFuture; + } + + /** + * Returns a {@link CompletionStage} that completes when the response exchange is complete + * (end-of-stream reached and trailers processed, if any). + * + * @since 5.5 + */ + public CompletionStage getResponseCompletionStage() { + return responseCompletionFuture; + } + + /** + * Completes with {@code null} on success and with the terminal {@link Throwable} on failure. + * This stage never completes exceptionally. + * + * @since 5.5 + */ + public CompletionStage getFailureStage() { + return failureFuture; + } + /** * Returns the intermediate (1xx) HTTP response if one was received. * @@ -124,7 +191,11 @@ public void consumeResponse( ) { this.entityDetails = entityDetails; this.responseCompletion = new BasicFuture<>(resultCallback); - this.responseFuture.completed(new Message<>(response, reactiveDataConsumer)); + + final Message> message = new Message<>(response, reactiveDataConsumer); + this.responseFuture.completed(message); + this.responseCompletableFuture.complete(message); + if (entityDetails == null) { streamEnd(null); } @@ -139,8 +210,15 @@ public void informationResponse(final HttpResponse response, final HttpContext h public void failed(final Exception cause) { reactiveDataConsumer.failed(cause); responseFuture.failed(cause); - if (responseCompletion != null) { - responseCompletion.failed(cause); + responseCompletableFuture.completeExceptionally(cause); + responseCompletionFuture.completeExceptionally(cause); + + // Record failure as a normal completion value. + failureFuture.complete(cause); + + final BasicFuture completion = responseCompletion; + if (completion != null) { + completion.failed(cause); } } @@ -160,15 +238,39 @@ public void streamEnd(final List trailers) { this.trailers.addAll(trailers); } reactiveDataConsumer.streamEnd(trailers); - responseCompletion.completed(null); + + // Complete CF before BasicFuture.completed(...) (it may trigger releaseResources()). + responseCompletionFuture.complete(null); + + // Success => no failure. + failureFuture.complete(null); + + final BasicFuture completion = responseCompletion; + if (completion != null) { + completion.completed(null); + } } @Override public void releaseResources() { reactiveDataConsumer.releaseResources(); + responseFuture.cancel(); - if (responseCompletion != null) { - responseCompletion.cancel(); + + if (!responseCompletableFuture.isDone()) { + responseCompletableFuture.cancel(true); + } + if (!responseCompletionFuture.isDone()) { + responseCompletionFuture.cancel(true); + } + + if (!failureFuture.isDone()) { + failureFuture.complete(new CancellationException()); + } + + final BasicFuture completion = responseCompletion; + if (completion != null) { + completion.cancel(); } } } diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java new file mode 100644 index 0000000000..c2c25a1a79 --- /dev/null +++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java @@ -0,0 +1,246 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactive.examples; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.InetAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.impl.Http1StreamListener; +import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; +import org.reactivestreams.Publisher; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; + +/** + * Client demo using CompletionStage accessors on ReactiveResponseConsumer (Java 8). + */ +public final class ReactiveClientCompletionStageExample { + + private ReactiveClientCompletionStageExample() { + } + + private static CompletionStage withTimeout( + final CompletableFuture future, + final ScheduledExecutorService scheduler, + final long timeout, + final TimeUnit unit) { + + final CompletableFuture timeoutFuture = new CompletableFuture<>(); + final java.util.concurrent.ScheduledFuture task = scheduler.schedule( + () -> timeoutFuture.completeExceptionally(new TimeoutException("Timeout after " + timeout + " " + unit)), + timeout, unit); + + final CompletableFuture combined = future.applyToEither(timeoutFuture, t -> t); + combined.whenComplete((v, ex) -> task.cancel(false)); + return combined; + } + + private static CompletableFuture readBodyAsString(final Publisher publisher) { + final CompletableFuture bodyFuture = new CompletableFuture<>(); + + Observable.fromPublisher(publisher) + .map(buf -> { + final byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + return new String(bytes, UTF_8); + }) + .reduce(new StringBuilder(), StringBuilder::append) + .map(StringBuilder::toString) + .subscribe( + bodyFuture::complete, + bodyFuture::completeExceptionally); + + return bodyFuture; + } + + private static boolean isLoopbackHost(final String host) { + if (host == null) { + return false; + } + return "localhost".equalsIgnoreCase(host) || "127.0.0.1".equals(host) || "::1".equals(host); + } + + private static URI normalizeAuthority(final URI uri) { + if (!isLoopbackHost(uri.getHost())) { + return uri; + } + try { + final InetAddress local = InetAddress.getLocalHost(); + final String canonical = local.getCanonicalHostName(); + if (canonical != null && !canonical.isEmpty() && !isLoopbackHost(canonical)) { + return new URI(uri.getScheme(), null, canonical, uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()); + } + final String addr = local.getHostAddress(); + if (addr != null && !addr.isEmpty() && !isLoopbackHost(addr)) { + return new URI(uri.getScheme(), null, addr, uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()); + } + } catch (final Exception ignore) { + // fall back + } + return uri; + } + + public static void main(final String[] args) throws Exception { + String endpoint = "http://localhost:8080/echo"; + if (args.length >= 1) { + endpoint = args[0]; + } + + final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap() + .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build()) + .setStreamListener(new Http1StreamListener() { + + @Override + public void onRequestHead(final HttpConnection connection, final HttpRequest request) { + System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request)); + } + + @Override + public void onResponseHead(final HttpConnection connection, final HttpResponse response) { + System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response)); + } + + @Override + public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) { + if (keepAlive) { + System.out.println(connection.getRemoteAddress() + + " exchange completed (connection kept alive)"); + } else { + System.out.println(connection.getRemoteAddress() + + " exchange completed (connection closed)"); + } + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "timeout-scheduler"); + t.setDaemon(true); + return t; + }); + + try { + final URI uri = normalizeAuthority(new URI(endpoint)); + + final Random random = new Random(); + final Flowable requestBody = Flowable.range(1, 100) + .map(i -> ByteBuffer.wrap((i + ":" + random.nextDouble() + "\n").getBytes(UTF_8))); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(uri) + .setEntity(new ReactiveEntityProducer(requestBody, -1, ContentType.TEXT_PLAIN, null)) + .build(); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null); + + consumer.getFailureStage().whenComplete((t, ex) -> { + if (ex != null) { + ex.printStackTrace(); + } else if (t != null) { + System.out.println("Request failed: " + t); + } + }); + + final CompletableFuture printedAndDrained = new CompletableFuture<>(); + + consumer.getResponseStage().whenComplete((msg, ex) -> { + if (ex != null) { + printedAndDrained.completeExceptionally(ex); + return; + } + try { + System.out.println(msg.getHead()); + for (final Header h : msg.getHead().getHeaders()) { + System.out.println(h); + } + System.out.println(); + + readBodyAsString(msg.getBody()).whenComplete((body, ex2) -> { + if (ex2 != null) { + printedAndDrained.completeExceptionally(ex2); + } else { + if (body != null && !body.isEmpty()) { + System.out.print(body); + if (!body.endsWith("\n")) { + System.out.println(); + } + } + printedAndDrained.complete(null); + } + }); + } catch (final RuntimeException e) { + printedAndDrained.completeExceptionally(e); + } + }); + + final CompletableFuture exchangeDone = consumer.getResponseCompletionStage().toCompletableFuture(); + + final CompletableFuture both = CompletableFuture.allOf(printedAndDrained, exchangeDone); + withTimeout(both, scheduler, 60, TimeUnit.SECONDS).toCompletableFuture().get(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + + } finally { + scheduler.shutdownNow(); + } + } +} diff --git a/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java new file mode 100644 index 0000000000..9dc7fb5068 --- /dev/null +++ b/httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveCompletableFuturesExample.java @@ -0,0 +1,207 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactive.examples; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.impl.Http1StreamListener; +import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactive.ReactiveEntityProducer; +import org.apache.hc.core5.reactive.ReactiveResponseConsumer; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; +import org.reactivestreams.Publisher; + +/** + * Java 8 compatible client example using ReactiveResponseConsumer's new CompletableFuture API. + *

+ * Uses: + * - consumer.getResponseCompletableFuture(): response head + body publisher ready + * - consumer.getResponseCompletionFuture(): exchange fully complete + */ +public final class ReactiveCompletableFuturesExample { + + private ReactiveCompletableFuturesExample() { + } + + private static CompletableFuture withTimeout( + final CompletableFuture future, + final ScheduledExecutorService scheduler, + final long timeout, + final TimeUnit unit) { + + final CompletableFuture timeoutFuture = new CompletableFuture<>(); + final java.util.concurrent.ScheduledFuture task = scheduler.schedule( + () -> timeoutFuture.completeExceptionally(new TimeoutException("Timeout after " + timeout + " " + unit)), + timeout, unit); + + final CompletableFuture combined = future.applyToEither(timeoutFuture, t -> t); + combined.whenComplete((v, ex) -> task.cancel(false)); + return combined; + } + + private static CompletableFuture readBodyAsString(final Publisher publisher) { + final CompletableFuture bodyFuture = new CompletableFuture<>(); + + Observable.fromPublisher(publisher) + .map(buf -> { + final byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + return new String(bytes, UTF_8); + }) + .reduce(new StringBuilder(), StringBuilder::append) + .map(StringBuilder::toString) + .subscribe( + bodyFuture::complete, + bodyFuture::completeExceptionally); + + return bodyFuture; + } + + public static void main(final String[] args) throws Exception { + String endpoint = "http://manjaro:8080/echo"; + if (args.length >= 1) { + endpoint = args[0]; + } + + final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap() + .setIOReactorConfig(IOReactorConfig.custom().setSoTimeout(5, TimeUnit.SECONDS).build()) + .setStreamListener(new Http1StreamListener() { + + @Override + public void onRequestHead(final HttpConnection connection, final HttpRequest request) { + System.out.println(connection.getRemoteAddress() + " " + new RequestLine(request)); + } + + @Override + public void onResponseHead(final HttpConnection connection, final HttpResponse response) { + System.out.println(connection.getRemoteAddress() + " " + new StatusLine(response)); + } + + @Override + public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) { + if (keepAlive) { + System.out.println(connection.getRemoteAddress() + " exchange completed (connection kept alive)"); + } else { + System.out.println(connection.getRemoteAddress() + " exchange completed (connection closed)"); + } + } + + }) + .create(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("HTTP requester shutting down"); + requester.close(CloseMode.GRACEFUL); + })); + + requester.start(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "timeout-scheduler"); + t.setDaemon(true); + return t; + }); + + try { + final URI uri = new URI(endpoint); + + final Random random = new Random(); + final Flowable requestBody = Flowable.range(1, 100) + .map(i -> ByteBuffer.wrap((i + ":" + random.nextDouble() + "\n").getBytes(UTF_8))); + + final AsyncRequestProducer requestProducer = AsyncRequestBuilder.post(uri) + .addHeader("X-Demo", "cfutures-java8") + .setEntity(new ReactiveEntityProducer(requestBody, -1, ContentType.TEXT_PLAIN, null)) + .build(); + + final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer(); + + requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null); + + final CompletableFuture printedAndDrained = consumer.getResponseCompletableFuture() + .thenCompose((final Message> streamingResponse) -> { + + final HttpResponse head = streamingResponse.getHead(); + final int code = head.getCode(); + + System.out.println(head); + for (final Header header : head.getHeaders()) { + System.out.println(header); + } + System.out.println(); + + return readBodyAsString(streamingResponse.getBody()).thenApply(body -> { + if (body != null && !body.isEmpty()) { + System.out.print(body); + if (!body.endsWith("\n")) { + System.out.println(); + } + } + if (code >= 400) { + System.out.println("Request failed: HTTP " + code + " " + head.getReasonPhrase()); + } + return null; + }); + }); + + final CompletableFuture exchangeDone = consumer.getResponseCompletionFuture(); + + final CompletableFuture both = CompletableFuture.allOf(printedAndDrained, exchangeDone); + withTimeout(both, scheduler, 60, TimeUnit.SECONDS).get(); + + System.out.println("Shutting down I/O reactor"); + requester.initiateShutdown(); + + } finally { + scheduler.shutdownNow(); + } + } +}