package io.prestosql.dispatcher;

import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.jaxrs.AsyncResponseHandler;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.prestosql.client.QueryError;
import io.prestosql.client.QueryResults;
import io.prestosql.client.StatementStats;
import io.prestosql.dispatcher.DispatcherConfig;
import io.prestosql.execution.ExecutionFailureInfo;
import io.prestosql.execution.QueryState;
import io.prestosql.server.HttpRequestSessionContext;
import io.prestosql.server.SessionContext;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.ErrorCode;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.StandardErrorCode;
import io.prestosql.spi.security.GroupProvider;
import io.prestosql.spi.security.Identity;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

@Path("/v1/statement")
/* loaded from: input_file:io/prestosql/dispatcher/QueuedStatementResource.class */
public class QueuedStatementResource {
    private static final Logger log = Logger.get(QueuedStatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1.0d, TimeUnit.SECONDS);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final Duration NO_DURATION = new Duration(0.0d, TimeUnit.MILLISECONDS);
    private final DispatcherConfig.HeaderSupport forwardedHeaderSupport;
    private final GroupProvider groupProvider;
    private final DispatchManager dispatchManager;
    private final Executor responseExecutor;
    private final ScheduledExecutorService timeoutExecutor;
    private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap();
    private final ScheduledExecutorService queryPurger = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed("dispatch-query-purger"));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/prestosql/dispatcher/QueuedStatementResource$Query.class */
    public static final class Query {
        private final String query;
        private final SessionContext sessionContext;
        private final DispatchManager dispatchManager;
        private final QueryId queryId;
        private final Slug slug = Slug.createNew();
        private final AtomicLong lastToken = new AtomicLong();

        @GuardedBy("this")
        private ListenableFuture<?> querySubmissionFuture;

        public Query(String str, SessionContext sessionContext, DispatchManager dispatchManager) {
            this.query = (String) Objects.requireNonNull(str, "query is null");
            this.sessionContext = (SessionContext) Objects.requireNonNull(sessionContext, "sessionContext is null");
            this.dispatchManager = (DispatchManager) Objects.requireNonNull(dispatchManager, "dispatchManager is null");
            this.queryId = dispatchManager.createQueryId();
        }

        public QueryId getQueryId() {
            return this.queryId;
        }

        public Slug getSlug() {
            return this.slug;
        }

        public long getLastToken() {
            return this.lastToken.get();
        }

        public synchronized boolean isSubmissionFinished() {
            return this.querySubmissionFuture != null && this.querySubmissionFuture.isDone();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ListenableFuture<?> waitForDispatched() {
            synchronized (this) {
                if (this.querySubmissionFuture == null) {
                    this.querySubmissionFuture = this.dispatchManager.createQuery(this.queryId, this.slug, this.sessionContext, this.query);
                }
                if (this.querySubmissionFuture.isDone()) {
                    return this.dispatchManager.waitForDispatched(this.queryId);
                }
                return this.querySubmissionFuture;
            }
        }

        public QueryResults getQueryResults(long j, UriInfo uriInfo, String str) {
            long j2 = this.lastToken.get();
            if (j != j2 && j != j2 + 1) {
                throw new WebApplicationException(Response.Status.GONE);
            }
            this.lastToken.compareAndSet(j2, j);
            synchronized (this) {
                if (this.querySubmissionFuture == null || !this.querySubmissionFuture.isDone()) {
                    return createQueryResults(j + 1, uriInfo, str, DispatchInfo.queued(QueuedStatementResource.NO_DURATION, QueuedStatementResource.NO_DURATION));
                }
                Optional<DispatchInfo> dispatchInfo = this.dispatchManager.getDispatchInfo(this.queryId);
                if (dispatchInfo.isPresent()) {
                    return createQueryResults(j + 1, uriInfo, str, dispatchInfo.get());
                }
                throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND).build());
            }
        }

        public synchronized void cancel() {
            this.querySubmissionFuture.addListener(() -> {
                this.dispatchManager.cancelQuery(this.queryId);
            }, MoreExecutors.directExecutor());
        }

        public void destroy() {
            this.sessionContext.getIdentity().destroy();
        }

        private QueryResults createQueryResults(long j, UriInfo uriInfo, String str, DispatchInfo dispatchInfo) {
            return QueuedStatementResource.createQueryResults(this.queryId, getNextUri(j, uriInfo, str, dispatchInfo), dispatchInfo.getFailureInfo().map(this::toQueryError), uriInfo, str, dispatchInfo.getElapsedTime(), dispatchInfo.getQueuedTime());
        }

        private URI getNextUri(long j, UriInfo uriInfo, String str, DispatchInfo dispatchInfo) {
            if (dispatchInfo.getFailureInfo().isPresent()) {
                return null;
            }
            return (URI) dispatchInfo.getCoordinatorLocation().map(coordinatorLocation -> {
                return getRedirectUri(coordinatorLocation, uriInfo, str);
            }).orElseGet(() -> {
                return QueuedStatementResource.getQueuedUri(this.queryId, this.slug, j, uriInfo, str);
            });
        }

        private URI getRedirectUri(CoordinatorLocation coordinatorLocation, UriInfo uriInfo, String str) {
            return UriBuilder.fromUri(coordinatorLocation.getUri(uriInfo, str)).replacePath("/v1/statement/executing").path(this.queryId.toString()).path(this.slug.makeSlug(Slug.Context.EXECUTING_QUERY, 0L)).path("0").build(new Object[0]);
        }

        private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo) {
            ErrorCode errorCode;
            if (executionFailureInfo.getErrorCode() != null) {
                errorCode = executionFailureInfo.getErrorCode();
            } else {
                errorCode = StandardErrorCode.GENERIC_INTERNAL_ERROR.toErrorCode();
                QueuedStatementResource.log.warn("Failed query %s has no error code", new Object[]{this.queryId});
            }
            return new QueryError((String) MoreObjects.firstNonNull(executionFailureInfo.getMessage(), "Internal error"), (String) null, errorCode.getCode(), errorCode.getName(), errorCode.getType().toString(), executionFailureInfo.getErrorLocation(), executionFailureInfo.toFailureInfo());
        }
    }

    @Inject
    public QueuedStatementResource(DispatcherConfig dispatcherConfig, GroupProvider groupProvider, DispatchManager dispatchManager, DispatchExecutor dispatchExecutor) {
        Objects.requireNonNull(dispatcherConfig, "dispatcherConfig is null");
        this.forwardedHeaderSupport = dispatcherConfig.getForwardedHeaderSupport();
        this.groupProvider = (GroupProvider) Objects.requireNonNull(groupProvider, "groupProvider is null");
        this.dispatchManager = (DispatchManager) Objects.requireNonNull(dispatchManager, "dispatchManager is null");
        Objects.requireNonNull(dispatchManager, "dispatchManager is null");
        this.responseExecutor = ((DispatchExecutor) Objects.requireNonNull(dispatchExecutor, "responseExecutor is null")).getExecutor();
        this.timeoutExecutor = ((DispatchExecutor) Objects.requireNonNull(dispatchExecutor, "timeoutExecutor is null")).getScheduledExecutor();
        this.queryPurger.scheduleWithFixedDelay(() -> {
            Query remove;
            try {
                UnmodifiableIterator it = ImmutableSet.copyOf(this.queries.entrySet()).iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    if (((Query) entry.getValue()).isSubmissionFinished()) {
                        if (!dispatchManager.isQueryRegistered((QueryId) entry.getKey()) && (remove = this.queries.remove(entry.getKey())) != null) {
                            try {
                                remove.destroy();
                            } catch (Throwable th) {
                                log.warn(th, "Error destroying identity");
                            }
                        }
                    }
                }
            } catch (Throwable th2) {
                log.warn(th2, "Error removing old queries");
            }
        }, 200L, 200L, TimeUnit.MILLISECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queryPurger.shutdownNow();
    }

    @POST
    @Produces({"application/json"})
    public Response postStatement(String str, @HeaderParam("X-Forwarded-Proto") String str2, @Context HttpServletRequest httpServletRequest, @Context HttpHeaders httpHeaders, @Context UriInfo uriInfo) {
        if (Strings.isNullOrEmpty(str)) {
            throw badRequest(Response.Status.BAD_REQUEST, "SQL statement is empty");
        }
        String remoteAddr = httpServletRequest.getRemoteAddr();
        Optional ofNullable = Optional.ofNullable((Identity) httpServletRequest.getAttribute(HttpRequestSessionContext.AUTHENTICATED_IDENTITY));
        Query query = new Query(str, new HttpRequestSessionContext(this.forwardedHeaderSupport, httpHeaders.getRequestHeaders(), remoteAddr, ofNullable, this.groupProvider), this.dispatchManager);
        this.queries.put(query.getQueryId(), query);
        httpServletRequest.setAttribute(HttpRequestSessionContext.AUTHENTICATED_IDENTITY, (Object) null);
        return Response.ok(query.getQueryResults(query.getLastToken(), uriInfo, str2)).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("queued/{queryId}/{slug}/{token}")
    public void getStatus(@PathParam("queryId") QueryId queryId, @PathParam("slug") String str, @PathParam("token") long j, @QueryParam("maxWait") Duration duration, @HeaderParam("X-Forwarded-Proto") String str2, @Context UriInfo uriInfo, @Suspended AsyncResponse asyncResponse) {
        Query query = getQuery(queryId, str, j);
        AsyncResponseHandler.bindAsyncResponse(asyncResponse, Futures.transform(Futures.transform(MoreFutures.addTimeout(query.waitForDispatched(), () -> {
            return null;
        }, (Duration) WAIT_ORDERING.min(MAX_WAIT_TIME, duration), this.timeoutExecutor), obj -> {
            return query.getQueryResults(j, uriInfo, str2);
        }, this.responseExecutor), queryResults -> {
            return Response.ok(queryResults).build();
        }, MoreExecutors.directExecutor()), this.responseExecutor);
    }

    @Produces({"application/json"})
    @Path("queued/{queryId}/{slug}/{token}")
    @DELETE
    public Response cancelQuery(@PathParam("queryId") QueryId queryId, @PathParam("slug") String str, @PathParam("token") long j) {
        getQuery(queryId, str, j).cancel();
        return Response.noContent().build();
    }

    private Query getQuery(QueryId queryId, String str, long j) {
        Query query = this.queries.get(queryId);
        if (query == null || !query.getSlug().isValid(Slug.Context.QUEUED_QUERY, str, j)) {
            throw badRequest(Response.Status.NOT_FOUND, "Query not found");
        }
        return query;
    }

    private static URI getQueryHtmlUri(QueryId queryId, UriInfo uriInfo, String str) {
        return uriInfo.getRequestUriBuilder().scheme(getScheme(str, uriInfo)).replacePath("ui/query.html").replaceQuery(queryId.toString()).build(new Object[0]);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static URI getQueuedUri(QueryId queryId, Slug slug, long j, UriInfo uriInfo, String str) {
        return uriInfo.getBaseUriBuilder().scheme(getScheme(str, uriInfo)).replacePath("/v1/statement/queued/").path(queryId.toString()).path(slug.makeSlug(Slug.Context.QUEUED_QUERY, j)).path(String.valueOf(j)).replaceQuery("").build(new Object[0]);
    }

    private static String getScheme(String str, @Context UriInfo uriInfo) {
        return Strings.isNullOrEmpty(str) ? uriInfo.getRequestUri().getScheme() : str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static QueryResults createQueryResults(QueryId queryId, URI uri, Optional<QueryError> optional, UriInfo uriInfo, String str, Duration duration, Duration duration2) {
        QueryState queryState = (QueryState) optional.map(queryError -> {
            return QueryState.FAILED;
        }).orElse(QueryState.QUEUED);
        return new QueryResults(queryId.toString(), getQueryHtmlUri(queryId, uriInfo, str), (URI) null, uri, (List) null, (List) null, StatementStats.builder().setState(queryState.toString()).setQueued(queryState == QueryState.QUEUED).setElapsedTimeMillis(duration.toMillis()).setQueuedTimeMillis(duration2.toMillis()).build(), optional.orElse(null), ImmutableList.of(), (String) null, (Long) null);
    }

    private static WebApplicationException badRequest(Response.Status status, String str) {
        throw new WebApplicationException(Response.status(status).type(MediaType.TEXT_PLAIN_TYPE).entity(str).build());
    }
}
