我正在使用 .NET 客户端 API:IUploadProgress 进度 = insertMediaUpload.Upload() 将 csv 上传到 BigQuery。基本上我做的是:
1.提交上传作业,
2. 获取作业状态(待处理、正在运行、已完成..)
3. 如果 BigQuery 给出任何错误,则打印它们并抛出异常以进一步处理。
下面的代码并没有完全达到我想要的效果,我希望有人可以帮助我改进它。
具体来说,发生了几个奇怪的代码行为:
1. 即使在故意失败的同一个 CSV 上运行相同的代码,在 UploadOnResponseReceived() 中解析出的 BQ 错误消息也会在某些调用中打印出来,但在其他调用中不会打印出来。为什么?
2. IUploadProgress值似乎与UploadOnResponseReceived()行为相关:如果我在UploadOnResponseReceived中什么都不做,那么progress.status将始终为“Completed”,如果UploadOnResponseReceived抛出异常,那么progress.status将失败。
3.当progress.status失败时,无法获取UploadOnResponseReceived抛出的异常。我确实需要获得例外,我该怎么办?
public bool ExecuteUploadJobToTable(string dataset, string tableId, string filePath, TableSchema schema, string createDisposition, char delimiter)
{
TableReference destTable = new TableReference { ProjectId = _account.ProjectId, DatasetId = dataset, TableId = tableId };
JobConfigurationLoad configLoad = new JobConfigurationLoad
{
Schema = schema,
DestinationTable = destTable,
Encoding = "ISO-8859-1",
CreateDisposition = "CREATE_IF_NEEDED",
WriteDisposition = createDisposition,
FieldDelimiter = delimiter.ToString(),
AllowJaggedRows = true,
SourceFormat = "CSV"
};
JobConfiguration config = new JobConfiguration {Load = configLoad};
Job job = new Job {Configuration = config};
//set job reference (mainly job id)
JobReference jobRef = new JobReference
{
JobId = GenerateJobID("Upload"),
ProjectId = _account.ProjectId
};
job.JobReference = jobRef;
bool isSuccess = true;
using (var fileStream = new FileStream(filePath, FileMode.Open))
{
JobsResource.InsertMediaUpload insertMediaUpload = new JobsResource.InsertMediaUpload(BigQueryService, job, job.JobReference.ProjectId, stream: fileStream, contentType: "application/octet-stream");
insertMediaUpload.ProgressChanged += UploadOnProgressChanged;
insertMediaUpload.ResponseReceived += UploadOnResponseReceived;
Console.WriteLine(string.Format("start {0}",jobRef.JobId));
IUploadProgress progress = insertMediaUpload.Upload();
if (progress.Status.ToString().Contains("Fail"))
{
isSuccess = false;
}
}
Console.WriteLine(isSuccess);
return isSuccess;
}
private void UploadOnProgressChanged(IUploadProgress process)
{
Console.WriteLine(process.Status + " " + process.BytesSent);
}
//thowring an exception will make IUploadProgress "Failed", otherwise, IUploadProgress will be "Completed"
private void UploadOnResponseReceived(Job job)
{
try
{
job = PollUntilJobDone(job.JobReference, 5);
}
catch(Exception e)
{
Console.WriteLine("Unexcepted unretryable exception happens when poll job status");
throw new BigQueryException("Unexcepted unretryable exception happens when poll job status",e);
}
StringBuilder errorMessageBuilder = new StringBuilder();
ErrorProto fatalError = job.Status.ErrorResult;
IList<ErrorProto> errors = job.Status.Errors;
if (fatalError != null)
{
errorMessageBuilder.AppendLine("Job failed while writing to Bigquery. " + fatalError.Reason + ": " + fatalError.Message +
" at " + fatalError.Location);
}
if (errors != null)
{
foreach (ErrorProto error in errors)
{
errorMessageBuilder.AppendLine("Error: [REASON] " + error.Reason + " [MESSAGE] " + error.Message +
" [LOCATION] " + error.Location);
}
}
if (errorMessageBuilder.Length>0)//fatalError != null || errors != null
{
Console.WriteLine(errorMessageBuilder.ToString());
throw new BigQueryException(errorMessageBuilder.ToString());
}
Console.WriteLine("upload should be successful");
}
private Job PollUntilJobDone(JobReference jobReference, int pauseSeconds)
{
int backoff = 1000;//backoff starts from 1 sec + random
for(int i = 0; i < 10; i++)
{
try
{
var pollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();
Console.WriteLine(jobReference.JobId + ": " + pollJob.Status.State);
if (pollJob.Status.State.Equals("DONE"))
{
return pollJob;
}
// Pause execution for pauseSeconds before polling job status again,
// to reduce unnecessary calls to the BigQuery API and lower overall
// application bandwidth.
Thread.Sleep(pauseSeconds * 1000);
}
catch (Exception e)
{
BigQueryException exception = new BigQueryException(e.Message,e);
if (exception.IsTemporary)
{
int sleep = backoff + Random.Next(1000);
Console.WriteLine("pollUntilJobDone job execute failed. Sleeping {0} ms before retry", sleep);
Thread.Sleep(sleep);
}
else
{
throw;
}
}
backoff *= 2;
}
return null;
}
关于您的“如何捕获异常”问题,回调似乎在另一个线程上异步发生。如果抛出异常,它将被调用回调的任何框架捕获。
搜索类似的问题,我发现这些答案可能对您有帮助:捕获异步回调中抛出的异常,这个问题展示了如何根据后台线程中收到的上传进度更新另一个线程中的UI:跟踪上传WebClient的进展
我花了数周时间试图弄清楚如何使用 .NET 云库获取上传进度,因为 BigQueryClient 没有公开任何事件。你的代码拼凑出了我需要的东西,我想回报你的帮助,尽管已经过去了 9 年多了。
处理 BQ 时有 2 个阶段。第一个是作业上传,第二个是作业状态。你必须把事情分开。我需要上传 700MB 以上的大文件,并且使用简单的 BigQueryClient.UploadJSON() 方法将一直保留到上传完成。不是最佳的。我上传文件并获取进度状态。完成后,您将每隔 X 秒自行轮询一次作业并处理结果。两个进程都有自己的异常需要处理。这是我为下一个人提供的整个功能。
Imports Google.Api.Gax
Imports Google.Apis.Bigquery.v2
Imports Google.Cloud.BigQuery.V2
Imports System.Reflection
Public Function UploadJSON(ByVal datasetId As String, ByVal tableId As String, ByRef memStream As System.IO.MemoryStream) As Boolean
'Set default dataset
If String.IsNullOrEmpty(datasetId) Then datasetId = "my_dataset_id_in_bq"
'Validation
If Not IsNothing(memStream) AndAlso memStream.Length() > 0 AndAlso Not String.IsNullOrEmpty(tableId) Then
'Create BQ client base classes
Dim destTable As Data.TableReference = New Data.TableReference With {.ProjectId = crypt.DecryptData(BQCredential.ProjectId), .DatasetId = datasetId, .TableId = tableId}
Dim configLoad As Data.JobConfigurationLoad = New Data.JobConfigurationLoad With {.Schema = GetSchemaFromObject(New CompletedContact()), .DestinationTable = destTable, .Encoding = "UTF-8", .SourceFormat = "NEWLINE_DELIMITED_JSON"}
Dim config As Data.JobConfiguration = New Data.JobConfiguration With {.Load = configLoad}
Dim job As Data.Job = New Data.Job With {.Configuration = config}
Dim JobId As String = $"job_{System.Guid.NewGuid().ToString()}"
Dim jobRef As Data.JobReference = New Data.JobReference With {.JobId = JobId, .ProjectId = crypt.DecryptData(BQCredential.ProjectId)}
job.JobReference = jobRef
'Create BQ client
Dim BQClient As BigQueryClient
If BQCredential.ServiceAccount.Length > 0 Then
Dim clientBuilder As New BigQueryClientBuilder()
clientBuilder.ProjectId = crypt.DecryptData(BQCredential.ProjectId)
clientBuilder.JsonCredentials = crypt.DecryptData(BQCredential.ServiceAccount)
BQClient = clientBuilder.Build()
Else
BQClient = BigQueryClient.Create(crypt.DecryptData(BQCredential.ProjectId))
End If
'Store byte size for math later
totalStreamSize = memStream.Length()
memStream.Seek(0, IO.SeekOrigin.Begin)
'Create an InsertMediaUpload in order to hook into progress changed event during base service upload.
Try
Dim insertMediaUpload As JobsResource.InsertMediaUpload = New JobsResource.InsertMediaUpload(BQClient.Service, job, job.JobReference.ProjectId, memStream, contentType:="application/octet-stream")
AddHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged
'AddHandler insertMediaUpload.ResponseReceived, AddressOf UploadOnResponseReceived
MessageEvents.Raise(EvtMsgType.INFO, $"Creating BigQuery Job: {jobRef.JobId}, Uploading data...")
'Start elapsed timer to calculate duration and Est. completion time
elapsedTime = DateTime.Now()
elapsedTimer.Enabled = True
elapsedTimer.Start()
Dim progress As Google.Apis.Upload.IUploadProgress = insertMediaUpload.Upload()
elapsedTimer.Stop()
elapsedTimer.Enabled = False
RemoveHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged
Catch ex As Exception
MessageEvents.Raise(EvtMsgType.INFO, $"Error creating BigQuery Job: {ex.Message}")
Return False
End Try
Dim results As BigQueryJob = BQClient.GetJob(job.JobReference)
MessageEvents.Raise(EvtMsgType.INFO, $"Upload complete. Polling Job Id: {results.Resource.JobReference.JobId}")
Dim BQTimeoutCounter As Integer = 0
Dim firstPass As Boolean = False
Dim sleepTime As Integer = 5
Do While results.Status.State <> "DONE" And BQTimeoutCounter <> (60 / sleepTime) '60 seconds
Try
BQTimeoutCounter += 1
System.Threading.Thread.Sleep(5000)
results = BQClient.GetJob(results.Resource.JobReference)
If results.State = JobState.Running AndAlso firstPass = False Then
Dim tmpOffset As DateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime)
MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId}, Started: {tmpOffset.LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}, Job Status: {results.Status.State}")
firstPass = True
End If
Catch ex As Exception
MessageEvents.Raise(EvtMsgType.INFO, $"Error: {ex.Message}")
End Try
Loop
Select Case results.State
Case JobState.Done
MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId} finished")
MessageEvents.Raise(EvtMsgType.INFO, $"Summary:")
If Not IsNothing(results.Status.Errors) Then
For Each Err As Data.ErrorProto In results.Status.Errors
MessageEvents.Raise(EvtMsgType.ERR, $"Reason: {Err.Reason}, Message: {Err.Message}")
Next
End If
If Not IsNothing(results.Statistics.Load) Then
MessageEvents.Raise(EvtMsgType.NONE, $" Input File Count: {results.Statistics.Load.InputFiles}")
MessageEvents.Raise(EvtMsgType.NONE, $" Total Input Size: {BytesToString(results.Statistics.Load.InputFileBytes)}")
MessageEvents.Raise(EvtMsgType.NONE, $" Output Row Count: {results.Statistics.Load.OutputRows}")
MessageEvents.Raise(EvtMsgType.NONE, $" Total Output Size: {BytesToString(results.Statistics.Load.OutputBytes)}")
MessageEvents.Raise(EvtMsgType.NONE, $" Bad Record Count: {results.Statistics.Load.BadRecords}")
MessageEvents.Raise(EvtMsgType.NONE, $" Start Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
MessageEvents.Raise(EvtMsgType.NONE, $" End Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.EndTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
MessageEvents.Raise(EvtMsgType.NONE, $" Total Execution Time: {TimeSpan.FromMilliseconds(results.Statistics.FinalExecutionDurationMs).Duration()}")
End If
Case JobState.Pending
MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout. Job is in pending state")
Case JobState.Running
MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout")
End Select
BQClient.Dispose()
Return True
End If
Return False
End Function
Public Function UploadOnProgressChanged(ByVal process As Google.Apis.Upload.IUploadProgress)
'Avoid divide by zero
Try
Dim currentPercent As Double = 0
If process.BytesSent <> 0 Then currentPercent = (process.BytesSent / totalStreamSize)
Dim timeElapsed As TimeSpan = DateTime.Now.Subtract(elapsedTime)
Dim remainingTime As TimeSpan
If timeElapsed.TotalSeconds <> 0 AndAlso currentPercent <> 0 Then
remainingTime = TimeSpan.FromSeconds(CDbl((timeElapsed.TotalSeconds / currentPercent) - timeElapsed.TotalSeconds))
Else
remainingTime = TimeSpan.Zero
End If
Dim BytesSent As String = process.BytesSent
If BytesSent <> 0 Then BytesSent = BytesToString(process.BytesSent)
MessageEvents.Raise(EvtMsgType.INFO, $"Status: {process.Status}, Sent: {BytesSent}/{BytesToString(totalStreamSize)}, Progress: {Math.Floor(currentPercent * 100)}%, Elapsed Time: {timeElapsed.ToString("hh\:mm\:ss")}, Est. Completion: {remainingTime.ToString("hh\:mm\:ss")}")
Catch ex As Exception
MessageEvents.Raise(EvtMsgType.ERR, $"{ex.Message}")
End Try
End Function