package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.class */
public abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class);
    protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
    protected final String stepUuid;
    protected final BigQueryServices bqServices;
    private transient List<BoundedSource<T>> cachedSplitResult;
    private SerializableFunction<SchemaAndRecord, T> parseFn;
    private Coder<T> coder;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase$ExtractResult.class */
    public static class ExtractResult {
        public final TableSchema schema;
        public final List<ResourceId> extractedFiles;

        public ExtractResult(TableSchema tableSchema, List<ResourceId> list) {
            this.schema = tableSchema;
            this.extractedFiles = list;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase$TableSchemaFunction.class */
    private static class TableSchemaFunction implements Serializable, Function<String, TableSchema> {
        private TableSchemaFunction() {
        }

        @Nullable
        public TableSchema apply(@Nullable String str) {
            return (TableSchema) BigQueryHelpers.fromJsonString(str, TableSchema.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BigQuerySourceBase(String str, BigQueryServices bigQueryServices, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> serializableFunction) {
        this.stepUuid = (String) Preconditions.checkNotNull(str, "stepUuid");
        this.bqServices = (BigQueryServices) Preconditions.checkNotNull(bigQueryServices, "bqServices");
        this.coder = (Coder) Preconditions.checkNotNull(coder, "coder");
        this.parseFn = (SerializableFunction) Preconditions.checkNotNull(serializableFunction, "parseFn");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExtractResult extractFiles(PipelineOptions pipelineOptions) throws Exception {
        BigQueryOptions bigQueryOptions = (BigQueryOptions) pipelineOptions.as(BigQueryOptions.class);
        TableReference tableToExtract = getTableToExtract(bigQueryOptions);
        Table table = this.bqServices.getDatasetService(bigQueryOptions).getTable(tableToExtract);
        if (table == null) {
            throw new IOException(String.format("Cannot start an export job since table %s does not exist", BigQueryHelpers.toTableSpec(tableToExtract)));
        }
        return new ExtractResult(table.getSchema(), executeExtract(BigQueryHelpers.getExtractJobId(BigQueryHelpers.createJobIdToken(pipelineOptions.getJobName(), this.stepUuid)), tableToExtract, this.bqServices.getJobService(bigQueryOptions), bigQueryOptions.getProject(), BigQueryHelpers.resolveTempLocation(bigQueryOptions.getTempLocation(), "BigQueryExtractTemp", this.stepUuid)));
    }

    public List<BoundedSource<T>> split(long j, PipelineOptions pipelineOptions) throws Exception {
        if (this.cachedSplitResult == null) {
            ExtractResult extractFiles = extractFiles(pipelineOptions);
            LOG.info("Extract job produced {} files", Integer.valueOf(extractFiles.extractedFiles.size()));
            cleanupTempResource((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            this.cachedSplitResult = (List) Preconditions.checkNotNull(createSources(extractFiles.extractedFiles, extractFiles.schema));
        }
        return this.cachedSplitResult;
    }

    protected abstract TableReference getTableToExtract(BigQueryOptions bigQueryOptions) throws Exception;

    protected abstract void cleanupTempResource(BigQueryOptions bigQueryOptions) throws Exception;

    public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
        throw new UnsupportedOperationException("BigQuery source must be split before being read");
    }

    public void validate() {
    }

    public Coder<T> getOutputCoder() {
        return this.coder;
    }

    private List<ResourceId> executeExtract(String str, TableReference tableReference, BigQueryServices.JobService jobService, String str2, String str3) throws InterruptedException, IOException {
        JobReference jobId = new JobReference().setProjectId(str2).setJobId(str);
        JobConfigurationExtract destinationUris = new JobConfigurationExtract().setSourceTable(tableReference).setDestinationFormat("AVRO").setDestinationUris(ImmutableList.of(BigQueryIO.getExtractDestinationUri(str3)));
        LOG.info("Starting BigQuery extract job: {}", str);
        jobService.startExtractJob(jobId, destinationUris);
        Job pollJob = jobService.pollJob(jobId, JOB_POLL_MAX_RETRIES);
        if (BigQueryHelpers.parseStatus(pollJob) != BigQueryHelpers.Status.SUCCEEDED) {
            throw new IOException(String.format("Extract job %s failed, status: %s.", pollJob.getJobReference().getJobId(), BigQueryHelpers.statusToPrettyString(pollJob.getStatus())));
        }
        LOG.info("BigQuery extract job completed: {}", str);
        return BigQueryIO.getExtractFilePaths(str3, pollJob);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<BoundedSource<T>> createSources(List<ResourceId> list, TableSchema tableSchema) throws IOException, InterruptedException {
        final String jsonFactory = BigQueryIO.JSON_FACTORY.toString(tableSchema);
        SerializableFunction<GenericRecord, T> serializableFunction = new SerializableFunction<GenericRecord, T>() { // from class: org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.1
            private Supplier<TableSchema> schema;

            {
                this.schema = Suppliers.memoize(Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonFactory)));
            }

            public T apply(GenericRecord genericRecord) {
                return (T) BigQuerySourceBase.this.parseFn.apply(new SchemaAndRecord(genericRecord, (TableSchema) this.schema.get()));
            }
        };
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<ResourceId> it = list.iterator();
        while (it.hasNext()) {
            newArrayList.add(AvroSource.from(it.next().toString()).withParseFn(serializableFunction, getOutputCoder()));
        }
        return ImmutableList.copyOf(newArrayList);
    }
}
