我有一个异步代码将目录中的所有文件上传到API。它工作正常,但我想跟踪哪个文件已上传并想要记录,以便我可以进行一些状态检查。
我一直关注this guide和this one。第一个为每个文件和第二个链接创建显式任务,仅讨论显示剩余任务的数量。另一方面,我想知道哪个任务已完成,将其打印到日志中并继续等待,直到没有要上传的文件为止。我在SSIS中使用脚本组件来执行此操作。
这是专门为2个文件定义的2个任务。
Task<string> uploadCounterpartFileTask = UploadFiles(year, month, filename_counterparts, accesstoken, path);
Task<string> uploadProductCategoryFileTask = UploadFiles(year, month, filename_productcategories, accesstoken, path);
var allTasks = new List<System.Threading.Tasks.Task> { uploadCounterpartFileTask, uploadProductCategoryFileTask };
while (allTasks.Any())
{
System.Threading.Tasks.Task finished = await System.Threading.Tasks.Task.WhenAny(allTasks);
if (finished == uploadCounterpartFileTask)
{
allTasks.Remove(uploadCounterpartFileTask);
var counterpartFile = await uploadCounterpartFileTask;
}
else if (finished == uploadProductCategoryFileTask)
{
allTasks.Remove(uploadProductCategoryFileTask);
var productCategoriesFile = await uploadProductCategoryFileTask;
}
}
但我现在不想这样做,因为我不知道目录中有多少文件
我想上传目录中的所有文件,如下所示。:-
string[] fileSystemEntries = Directory.GetFileSystemEntries(path);
var tasks = fileSystemEntries.OrderBy(s => s).Select(
fileName => UploadFiles(year, month, Path.GetFileName(fileName), accesstoken, path));
while (tasks.Any())
{
var finished = await System.Threading.Tasks.Task.WhenAny(tasks);
Dts.Events.FireInformation(0, "Script Task for uploading AVRO files", "File Uploaded" + finished.Result, string.Empty, 0, ref fireAgain);
tasks.Remove(finished);
}
Dts.Events.FireInformation(0, "Script Task for uploading AVRO files", "All Files Uploaded", string.Empty, 0, ref fireAgain);
我正在使用finished.Result,因为MSDN上的文档声明Result给出了在whenAny之后完成的任务。我知道WhenAny给出第一个,因此我循环(类似于MSDN上的示例)。
您可以看到Dts.Events.FireInformation,因为它位于SSIS中的脚本任务中。
我希望它会在日志中说出类似的内容。
文件已上传:Counterparts.avro
文件已上传:ProductCategorties.avro
文件已上传:SomeOtherFile.avro
已上传所有文件。
但是,虽然文件上传没有问题,但是没有打印任何语句。
我如何让它们打印?我只是不想对所有文件都有一个通知,我真的想要获得上传哪个文件的状态。
编辑:在回复其中一条评论时,这是UploadFiles的代码。基本上它将文件上传到API,每个文件会调用3个REST调用。
private async Task<string> UploadFiles(string year, string month, string filename, string accesstoken, string path)
{
//Initial Put
var method = new HttpMethod("PUT");
string url = String.Format("https://someurl/part1/{0}/{1}/{2}?resource=file&recursive=True", year, month, filename);
var request = new HttpRequestMessage(method, url)
{
Content = null
};
request.Headers.Add("Authorization", "Bearer " + accesstoken);
var initput = await client.SendAsync(request);
//Append Data
url = String.Format("https://someurl/part1/{0}/{1}/{2}?action=append&position=0", year, month, filename);
string SourceFile = path;
var content = new MultipartFormDataContent();
string filenamefullyqualified = path + filename;
Stream fs = System.IO.File.Open(filenamefullyqualified, FileMode.Open, FileAccess.Read, FileShare.None);
content.Add(CreateFileContent(fs, filename, "text/plain"));
method = new HttpMethod("PATCH");
request = new HttpRequestMessage(method, url)
{
Content = content
};
//long filelength = new System.IO.FileInfo(filenamefullyqualified).Length;
request.Headers.Add("Authorization", "Bearer " + accesstoken);
var response = await client.SendAsync(request);
long? position = request.Content.Headers.ContentLength;
//Flush Data
url = String.Format("https://someurl/part1/{0}/{1}/{2}?action=flush&position={3}", year, month, filename, position.ToString());
request = new HttpRequestMessage(method, url)
{
Content = null
};
request.Headers.Add("Authorization", "Bearer " + accesstoken);
response = await client.SendAsync(request);
return filename;
}
编辑2:回应评论。请注意,这是我尝试的示例代码,用于演示此问题。
public async void Main()
{
bool fireAgain = true;
HttpClientHandler httpClientHandler = new HttpClientHandler()
{
Proxy = new WebProxy("http://127.0.0.1:8888")
{
UseDefaultCredentials = true,
BypassProxyOnLocal = true
},
UseDefaultCredentials = true
};
string year = "2019";
string month = "02";
string path = "D:\\SomeFolder\\AVRO\\";
client = new HttpClient(handler: httpClientHandler, disposeHandler: true);
string resp = getBearerToken().GetAwaiter().GetResult();
var ser = new JavaScriptSerializer();
var result = ser.DeserializeObject(resp);
Dictionary<string, object> BearerTokenDetails = new Dictionary<string, object>();
BearerTokenDetails = (Dictionary<string, object>)(result);
string accesstoken = BearerTokenDetails["access_token"].ToString();
Dts.Events.FireInformation(0, "Script Task for uploading AVRO files", "Access Code: " + accesstoken, string.Empty, 0, ref fireAgain);
string[] fileSystemEntries = Directory.GetFileSystemEntries(path);
var tasks = fileSystemEntries.OrderBy(s => s).Select(
fileName => UploadFiles(year, month, Path.GetFileName(fileName), accesstoken, path));
await System.Threading.Tasks.Task.WhenAll(tasks);
Dts.Events.FireInformation(0, "Script Task for uploading AVRO files", "All Files Uploaded", string.Empty, 0, ref fireAgain);
}
你会看到我定义为async的getBearerToken方法,但后来使用GetResult将其作为阻塞调用,因为这需要先发生。此外,请注意 - 您在Bearer Token之后看到的第一个FireInfo,仅在我将其设置为阻塞调用后才能工作(GetResult在我阅读时执行此操作)。
假设FireInformation
按预期工作,您的代码可以大大简化。
您可以在UploadFiles
中记录每个任务的完成情况:
private async Task<string> UploadFiles(string year, string month, string filename, string accesstoken, string path)
{
// stuff
response = await client.SendAsync(request);
// Log task finished
Dts.Events.FireInformation(0, "Script Task for uploading AVRO files", "File Uploaded" + filename, string.Empty, 0, ref fireAgain);
return filename;
}
方法的结尾是文件上传和任务完成的时刻。无需while循环并从阵列中手动删除任务。
它认为如果仅用于记录,你甚至不需要再返回文件名
要记录所有已完成的消息,只需在Task.WhenAll
之后写一个日志文件:
string[] fileSystemEntries = Directory.GetFileSystemEntries(path);
var tasks = fileSystemEntries.OrderBy(s => s).Select(
fileName => UploadFiles(year, month, Path.GetFileName(fileName), accesstoken, path));
await Task.WhenAll(tasks);
Dts.Events.FireInformation(0, "Script Task for uploading AVRO files", "All Files Uploaded", string.Empty, 0, ref fireAgain);
现在,由于您在控制台中运行,您有两个选项,使用public async Task Main()
作为方法签名(需要c#7.1,请参阅this)或将await Task.WhenAll(tasks);
更改为Task.WaitAll(tasks);
否则应用程序将不会等待您的代码完成,因此没有输出显示。
另外一个选项是在Console.ReadKey();
方法的最后添加行Main
并等待输出显示。