我创建这个是为了测试并行提取:
public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
{
ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
using (var archive = ZipFile.OpenRead(file.FullName))
{
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
block.Post(entry);
}
block.Complete();
await block.Completion;
}
}
和以下用于测试的单元测试:
[TestMethod]
public async Task ExtractTestAsync()
{
if (Resources.LocalExtractFolder.Exists)
Resources.LocalExtractFolder.Delete(true);
// Resources.LocalExtractFolder.Create();
await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
}
MaxDegreeOfParallelism = 1 时一切正常,但 2 时则不行。
Test Name: ExtractTestAsync
Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source: c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome: Failed
Test Duration: 0:00:02.4138753
Result Message:
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception:
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:
at System.IO.Compression.Inflater.Decode()
at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
at System.IO.Stream.CopyTo(Stream destination)
at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
这是我自己并行执行的方法,它也不起作用 :) 请记住在 continueWith 中处理异常。
public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
{
int MaxDegreeOfParallelism = 2;
using (var archive = ZipFile.OpenRead(file.FullName))
{
var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
semaphore.WaitOne();
var task = Task.Run(() =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
});
task.ContinueWith(handle =>
{
try
{
//do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
semaphore.Release();
}
});
}
while(MaxDegreeOfParallelism-->0)
semaphore.WaitOne(); //Wait here until the last task completes.
}
}
这是异步版本:
public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
{
return Task.Factory.StartNew(() =>
{
int MaxDegreeOfParallelism = 50;
using (var archive = ZipFile.OpenRead(file.FullName))
{
var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
semaphore.WaitOne();
var task = Task.Run(() =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
});
task.ContinueWith(handle =>
{
try
{
//do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
semaphore.Release();
}
},TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
}
}
});
}
handle.Exception 中抛出以下异常。
{"Block length does not match with its complement."}
[0] = {"A local file header is corrupt."}
必须确定 ZipFile 是否线程安全。
最佳答案
免责声明:它只是一个概念证明。
在代码示例中用 ParallelZipFile.OpenRead 替换 ZipFile.OpenRead,所有 4 个单元测试都通过了。
public class ParallelZipFile
{
public static ParallelZipArchive OpenRead(string path)
{
return new ParallelZipArchive(ZipFile.OpenRead(path),path);
}
}
public class ParallelZipArchive : IDisposable
{
internal ZipArchive _archive;
internal string _path;
internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();
public ParallelZipArchive(ZipArchive zip,string path)
{
_path = path;
_archive = zip;
FreeReaders.Enqueue(zip);
}
public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
{
get
{
var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
int i = 0;
foreach (var entry in _archive.Entries)
list.Add(new ParallelZipArchiveEntry(i++, entry, this));
return new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
}
}
public void Dispose()
{
foreach (var archive in FreeReaders)
archive.Dispose();
}
}
public class ParallelZipArchiveEntry
{
private ParallelZipArchive _parent;
private int _entry;
public string Name { get; set; }
public string FullName { get; set; }
public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
{
_entry = entryNr;
_parent = parent;
Name = entry.Name;
FullName = entry.FullName;
}
public void ExtractToFile(string path)
{
ZipArchive value;
Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));
if (!_parent.FreeReaders.TryDequeue(out value))
value = ZipFile.OpenRead(_parent._path);
value.Entries.Skip(_entry).First().ExtractToFile(path);
_parent.FreeReaders.Enqueue(value);
}
}
[TestClass]
public class ZipFileTests
{
[ClassInitialize()]
public static void PreInitialize(TestContext context)
{
if (Resources.LocalExtractFolderTruth.Exists)
Resources.LocalExtractFolderTruth.Delete(true);
ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);
}
[TestInitialize()]
public void InitializeTests()
{
if (Resources.LocalExtractFolder.Exists)
Resources.LocalExtractFolder.Delete(true);
}
[TestMethod]
public void ExtractTest()
{
Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
[TestMethod]
public async Task ExtractAsyncTest()
{
await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
[TestMethod]
public void ExtractSemaphoreTest()
{
Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
[TestMethod]
public async Task ExtractSemaphoreAsyncTest()
{
await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
}
关于c# - 我是在做错什么还是无法并行提取 zip 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16199557/
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返
我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
我正在尝试在我的centos服务器上安装therubyracer,但遇到了麻烦。$geminstalltherubyracerBuildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingtherubyracer:ERROR:Failedtobuildgemnativeextension./usr/local/rvm/rubies/ruby-1.9.3-p125/bin/rubyextconf.rbcheckingformain()in-lpthread...yescheckingforv8.h...no***e