.NET InsertMediaUpload 将 CSV 上传到 BigQuery 时出现奇怪问题

问题描述 投票:0回答:2

我正在使用 .NET 客户端 API:IUploadProgress 进度 = insertMediaUpload.Upload() 将 csv 上传到 BigQuery。基本上我做的是:

1.提交上传作业,
2. 获取作业状态(待处理、正在运行、已完成..)
3. 如果 BigQuery 给出任何错误,则打印它们并抛出异常以进一步处理。

下面的代码并没有完全达到我想要的效果,我希望有人可以帮助我改进它。

具体来说,发生了几个奇怪的代码行为:
1. 即使在故意失败的同一个 CSV 上运行相同的代码,在 UploadOnResponseReceived() 中解析出的 BQ 错误消息也会在某些调用中打印出来,但在其他调用中不会打印出来。为什么?
2. IUploadProgress值似乎与UploadOnResponseReceived()行为相关:如果我在UploadOnResponseReceived中什么都不做,那么progress.status将始终为“Completed”,如果UploadOnResponseReceived抛出异常,那么progress.status将失败。
3.当progress.status失败时,无法获取UploadOnResponseReceived抛出的异常。我确实需要获得例外,我该怎么办?

 public bool ExecuteUploadJobToTable(string dataset, string tableId, string filePath, TableSchema schema, string createDisposition, char delimiter)
    {

        TableReference destTable = new TableReference { ProjectId = _account.ProjectId, DatasetId = dataset, TableId = tableId };

        JobConfigurationLoad configLoad = new JobConfigurationLoad 
        {
            Schema = schema,
            DestinationTable = destTable,
            Encoding = "ISO-8859-1",
            CreateDisposition = "CREATE_IF_NEEDED",
            WriteDisposition = createDisposition,
            FieldDelimiter = delimiter.ToString(),
            AllowJaggedRows = true,
            SourceFormat = "CSV"
        };

        JobConfiguration config = new JobConfiguration {Load = configLoad};

        Job job = new Job {Configuration = config};

        //set job reference (mainly job id)
        JobReference jobRef = new JobReference
        {
            JobId = GenerateJobID("Upload"),
            ProjectId = _account.ProjectId
        };
        job.JobReference = jobRef;

        bool isSuccess = true;
        using (var fileStream = new FileStream(filePath, FileMode.Open))
        {
            JobsResource.InsertMediaUpload insertMediaUpload = new JobsResource.InsertMediaUpload(BigQueryService, job, job.JobReference.ProjectId, stream: fileStream, contentType: "application/octet-stream");
            insertMediaUpload.ProgressChanged += UploadOnProgressChanged;
            insertMediaUpload.ResponseReceived += UploadOnResponseReceived;

            Console.WriteLine(string.Format("start {0}",jobRef.JobId));
            IUploadProgress progress = insertMediaUpload.Upload();
            if (progress.Status.ToString().Contains("Fail"))
            {
                isSuccess = false;
            }
        }
        Console.WriteLine(isSuccess);
        return isSuccess;
    }

    private void UploadOnProgressChanged(IUploadProgress process)
    {
        Console.WriteLine(process.Status + " " + process.BytesSent);
    }

    //thowring an exception will make IUploadProgress "Failed", otherwise, IUploadProgress will be "Completed"
    private void UploadOnResponseReceived(Job job)
    {
        try
        {
            job = PollUntilJobDone(job.JobReference, 5);
        }
        catch(Exception e)
        {
            Console.WriteLine("Unexcepted unretryable exception happens when poll job status");
            throw new BigQueryException("Unexcepted unretryable exception happens when poll job status",e);
        }

        StringBuilder errorMessageBuilder = new StringBuilder();
        ErrorProto fatalError = job.Status.ErrorResult;
        IList<ErrorProto> errors = job.Status.Errors;
        if (fatalError != null)
        {
            errorMessageBuilder.AppendLine("Job failed while writing to Bigquery. " + fatalError.Reason + ": " + fatalError.Message +
                      " at " + fatalError.Location);
        }
        if (errors != null)
        {
            foreach (ErrorProto error in errors)
            {
                errorMessageBuilder.AppendLine("Error: [REASON] " + error.Reason + " [MESSAGE] " + error.Message +
                                               " [LOCATION] " + error.Location);
            }

        }
        if (errorMessageBuilder.Length>0)//fatalError != null || errors != null  
        {
            Console.WriteLine(errorMessageBuilder.ToString());
            throw new BigQueryException(errorMessageBuilder.ToString());
        }
        Console.WriteLine("upload should be successful");
    }

    private Job PollUntilJobDone(JobReference jobReference, int pauseSeconds)
    {
        int backoff = 1000;//backoff starts from 1 sec + random

        for(int i = 0; i < 10; i++)
        {
            try
            {
                var pollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();
                Console.WriteLine(jobReference.JobId + ": " + pollJob.Status.State);
                if (pollJob.Status.State.Equals("DONE"))
                {
                    return pollJob;
                }
                // Pause execution for pauseSeconds before polling job status again,
                // to reduce unnecessary calls to the BigQuery API and lower overall
                // application bandwidth.
                Thread.Sleep(pauseSeconds * 1000);
            }
            catch (Exception e)
            {
                BigQueryException exception = new BigQueryException(e.Message,e);
                if (exception.IsTemporary)
                {
                    int sleep = backoff + Random.Next(1000);
                    Console.WriteLine("pollUntilJobDone job execute failed. Sleeping {0} ms before retry", sleep);
                    Thread.Sleep(sleep);
                }
                else
                {
                    throw;
                }
            }
            backoff *= 2;
        }
        return null;
    }
c# google-bigquery
2个回答
2
投票

关于您的“如何捕获异常”问题,回调似乎在另一个线程上异步发生。如果抛出异常,它将被调用回调的任何框架捕获。

搜索类似的问题,我发现这些答案可能对您有帮助:捕获异步回调中抛出的异常,这个问题展示了如何根据后台线程中收到的上传进度更新另一个线程中的UI:跟踪上传WebClient的进展


0
投票

我花了数周时间试图弄清楚如何使用 .NET 云库获取上传进度,因为 BigQueryClient 没有公开任何事件。你的代码拼凑出了我需要的东西,我想回报你的帮助,尽管已经过去了 9 年多了。

处理 BQ 时有 2 个阶段。第一个是作业上传,第二个是作业状态。你必须把事情分开。我需要上传 700MB 以上的大文件,并且使用简单的 BigQueryClient.UploadJSON() 方法将一直保留到上传完成。不是最佳的。我上传文件并获取进度状态。完成后,您将每隔 X 秒自行轮询一次作业并处理结果。两个进程都有自己的异常需要处理。这是我为下一个人提供的整个功能。

Imports Google.Api.Gax
Imports Google.Apis.Bigquery.v2
Imports Google.Cloud.BigQuery.V2
Imports System.Reflection

Public Function UploadJSON(ByVal datasetId As String, ByVal tableId As String, ByRef memStream As System.IO.MemoryStream) As Boolean

    'Set default dataset
    If String.IsNullOrEmpty(datasetId) Then datasetId = "my_dataset_id_in_bq"

    'Validation
    If Not IsNothing(memStream) AndAlso memStream.Length() > 0 AndAlso Not String.IsNullOrEmpty(tableId) Then

        'Create BQ client base classes
        Dim destTable As Data.TableReference = New Data.TableReference With {.ProjectId = crypt.DecryptData(BQCredential.ProjectId), .DatasetId = datasetId, .TableId = tableId}
        Dim configLoad As Data.JobConfigurationLoad = New Data.JobConfigurationLoad With {.Schema = GetSchemaFromObject(New CompletedContact()), .DestinationTable = destTable, .Encoding = "UTF-8", .SourceFormat = "NEWLINE_DELIMITED_JSON"}
        Dim config As Data.JobConfiguration = New Data.JobConfiguration With {.Load = configLoad}
        Dim job As Data.Job = New Data.Job With {.Configuration = config}


        Dim JobId As String = $"job_{System.Guid.NewGuid().ToString()}"
        Dim jobRef As Data.JobReference = New Data.JobReference With {.JobId = JobId, .ProjectId = crypt.DecryptData(BQCredential.ProjectId)}
        job.JobReference = jobRef

        'Create BQ client
        Dim BQClient As BigQueryClient
        If BQCredential.ServiceAccount.Length > 0 Then
            Dim clientBuilder As New BigQueryClientBuilder()
            clientBuilder.ProjectId = crypt.DecryptData(BQCredential.ProjectId)
            clientBuilder.JsonCredentials = crypt.DecryptData(BQCredential.ServiceAccount)
            BQClient = clientBuilder.Build()
        Else
            BQClient = BigQueryClient.Create(crypt.DecryptData(BQCredential.ProjectId))
        End If

        'Store byte size for math later
        totalStreamSize = memStream.Length()
        memStream.Seek(0, IO.SeekOrigin.Begin)

        'Create an InsertMediaUpload in order to hook into progress changed event during base service upload.
        Try
            Dim insertMediaUpload As JobsResource.InsertMediaUpload = New JobsResource.InsertMediaUpload(BQClient.Service, job, job.JobReference.ProjectId, memStream, contentType:="application/octet-stream")

            AddHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged
            'AddHandler insertMediaUpload.ResponseReceived, AddressOf UploadOnResponseReceived

            MessageEvents.Raise(EvtMsgType.INFO, $"Creating BigQuery Job: {jobRef.JobId}, Uploading data...")

            'Start elapsed timer to calculate duration and Est. completion time
            elapsedTime = DateTime.Now()
            elapsedTimer.Enabled = True
            elapsedTimer.Start()

            Dim progress As Google.Apis.Upload.IUploadProgress = insertMediaUpload.Upload()

            elapsedTimer.Stop()
            elapsedTimer.Enabled = False

            RemoveHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged

        Catch ex As Exception
            MessageEvents.Raise(EvtMsgType.INFO, $"Error creating BigQuery Job: {ex.Message}")
            Return False
        End Try


        Dim results As BigQueryJob = BQClient.GetJob(job.JobReference)
        MessageEvents.Raise(EvtMsgType.INFO, $"Upload complete. Polling Job Id: {results.Resource.JobReference.JobId}")

        Dim BQTimeoutCounter As Integer = 0
        Dim firstPass As Boolean = False
        Dim sleepTime As Integer = 5

        Do While results.Status.State <> "DONE" And BQTimeoutCounter <> (60 / sleepTime) '60 seconds
            Try
                BQTimeoutCounter += 1
                System.Threading.Thread.Sleep(5000)

                results = BQClient.GetJob(results.Resource.JobReference)

                If results.State = JobState.Running AndAlso firstPass = False Then
                    Dim tmpOffset As DateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime)
                    MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId}, Started: {tmpOffset.LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}, Job Status: {results.Status.State}")
                    firstPass = True
                End If
            Catch ex As Exception
                MessageEvents.Raise(EvtMsgType.INFO, $"Error: {ex.Message}")
            End Try
        Loop

        Select Case results.State
            Case JobState.Done
                MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId} finished")
                MessageEvents.Raise(EvtMsgType.INFO, $"Summary:")

                If Not IsNothing(results.Status.Errors) Then
                    For Each Err As Data.ErrorProto In results.Status.Errors
                        MessageEvents.Raise(EvtMsgType.ERR, $"Reason: {Err.Reason}, Message: {Err.Message}")
                    Next
                End If

                If Not IsNothing(results.Statistics.Load) Then
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Input File Count: {results.Statistics.Load.InputFiles}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Total Input Size: {BytesToString(results.Statistics.Load.InputFileBytes)}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Output Row Count: {results.Statistics.Load.OutputRows}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Total Output Size: {BytesToString(results.Statistics.Load.OutputBytes)}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Bad Record Count: {results.Statistics.Load.BadRecords}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Start Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   End Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.EndTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Total Execution Time: {TimeSpan.FromMilliseconds(results.Statistics.FinalExecutionDurationMs).Duration()}")
                End If

            Case JobState.Pending
                MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout. Job is in pending state")
            Case JobState.Running
                MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout")
        End Select

        BQClient.Dispose()
        Return True

    End If

    Return False

End Function

Public Function UploadOnProgressChanged(ByVal process As Google.Apis.Upload.IUploadProgress)
    'Avoid divide by zero
    Try
        Dim currentPercent As Double = 0
        If process.BytesSent <> 0 Then currentPercent = (process.BytesSent / totalStreamSize)

        Dim timeElapsed As TimeSpan = DateTime.Now.Subtract(elapsedTime)
        Dim remainingTime As TimeSpan
        If timeElapsed.TotalSeconds <> 0 AndAlso currentPercent <> 0 Then
            remainingTime = TimeSpan.FromSeconds(CDbl((timeElapsed.TotalSeconds / currentPercent) - timeElapsed.TotalSeconds))
        Else
            remainingTime = TimeSpan.Zero
        End If

        Dim BytesSent As String = process.BytesSent
        If BytesSent <> 0 Then BytesSent = BytesToString(process.BytesSent)

        MessageEvents.Raise(EvtMsgType.INFO, $"Status: {process.Status}, Sent: {BytesSent}/{BytesToString(totalStreamSize)}, Progress: {Math.Floor(currentPercent * 100)}%, Elapsed Time: {timeElapsed.ToString("hh\:mm\:ss")}, Est. Completion: {remainingTime.ToString("hh\:mm\:ss")}")

    Catch ex As Exception
        MessageEvents.Raise(EvtMsgType.ERR, $"{ex.Message}")
    End Try


End Function
© www.soinside.com 2019 - 2024. All rights reserved.