IO读写操作的TPL Dataflow实现中的内存问题

Balraj辛格

我试图使用文件IO操作实现读写操作,并将这些操作封装到其中TransformBlock,以使这些操作线程安全,而不是使用锁定机制。

但是问题是,当我尝试并行写入5个文件时,内存异常,使用此实现会阻塞UI线程。该实现在Windows Phone项目中完成。请指出此实现中的错误。

文件IO操作

public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
};

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
};

public static async Task<T> LoadAsync<T>(string fileName)
{
    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName =>
       {
           return await LoadData<T>(fName);
       }, concurrentExecutionDataFlow);

    transBlock.Post(fileName);

    result = await transBlock.ReceiveAsync();

    return result;
}

public static async Task SaveAsync<T>(T obj, string fileName)
{
    var transBlock = new TransformBlock<Tuple<T, string>, Task>
       (async tupleData =>
       {
          await SaveData(tupleData.Item1, tupleData.Item2);
       }, exclusiveExecutionDataFlow);

    transBlock.Post(new Tuple<T, string>(obj, fileName));

    await transBlock.ReceiveAsync();
}

MainPage.xaml.cs用法

private static string data = "vjdsskjfhkjsdhvnvndjfhjvkhdfjkgd"
private static string fileName = string.Empty;
private List<string> DataLstSample = new List<string>();
private ObservableCollection<string> TestResults = new ObservableCollection<string>();
private static string data1 = "hjhkjhkhkjhjkhkhkjhkjhkhjkhjkh";
List<Task> allTsk = new List<Task>();
private Random rand = new Random();
private string  fileNameRand
{
    get
    {
        return rand.Next(100).ToString();
    }
}

public MainPage()
{
    InitializeComponent();

    for (int i = 0; i < 5; i ++)
    {
        DataLstSample.Add((i % 2) == 0 ? data : data1);
    }

}

private void Button_Click(object sender, RoutedEventArgs e)
{
    AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual();
}

public async void AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()
{
    TstRst.Text = "InProgress..";
    allTsk.Clear();

    foreach(var data in DataLstSample)
    {
        var fName = fileNameRand;

        var t = Task.Run(async () =>
        {
            await AppIsolatedStore.SaveAsync<string>(data, fName);
        });

        TestResults.Add(string.Format("Writing file name: {0}, data: {1}", fName, data));
        allTsk.Add(t);
    }

    await Task.WhenAll(allTsk);

    TstRst.Text = "Completed..";
}

保存和加载异步数据

        /// <summary>
        /// Load object from file
        /// </summary>
        private static async Task<T> LoadData<T>(string fileName)
        {

            T result = default(T);

            try
            {
                if (!string.IsNullOrWhiteSpace(fileName))
                {
                    using (var file = new IsolatedStorageFileStream(fileName, FileMode.OpenOrCreate, _isolatedStore))
                    {
                        var data = await _file.ReadTextAsync(file);

                        if (!string.IsNullOrWhiteSpace(data))
                        {
                            result = JsonConvert.DeserializeObject<T>(data);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: LoadAsync : An error occured while loading data : {0}", ex.Message);
            }
            finally
            {

            }

            return result;
        }


        /// <summary>
        /// Save object from file
        /// </summary>
        private static async Task SaveData<T>(T obj, string fileName)
        {
            try
            {
                if (obj != null && !string.IsNullOrWhiteSpace(fileName))
                {
                    //Serialize object with JSON or XML serializer
                    string storageString = JsonConvert.SerializeObject(obj);

                    if (!string.IsNullOrWhiteSpace(storageString))
                    {
                        //Write content to file
                        await _file.WriteTextAsync(new IsolatedStorageFileStream(fileName, FileMode.Create, _isolatedStore), storageString);
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: SaveAsync : An error occured while saving the data : {0}", ex.Message);
            }
            finally
            {
            }
        }

编辑:

它具有内存异常的原因是由于我拿走的数据字符串太大的一个原因。字符串是链接:http : //1drv.ms/1QWSAsc

但是第二个问题是,如果我也添加少量数据,那么它将阻塞UI线程。代码在UI上执行任何任务吗?

虚拟机

不,您使用的并发对使用默认线程池执行任务,并使用Runmethod实例化任务,因此问题不在这里。但是这里的代码有两个主要威胁:

var transBlock = new TransformBlock<string, T>
   (async fName =>
   {
       // process file here
   }, concurrentExecutionDataFlow);

您确实不应该transBlock每次都创建其主要思想TPL Dataflow是创建一次块,然后再使用它们。因此,您应该将应用程序重构为减少实例化的块数,否则就不TPL Dataflow应该使用这种情况

代码中的另一个威胁是您将显式阻塞线程!

// Right here
await Task.WhenAll(allTsk);
TstRst.Text = "Completed..";

从同步事件处理程序中awaitasync void方法中调用for任务会阻塞线程,因为默认情况下它将捕获同步上下文首先,async void应避免其次,如果您是异步的,则应该一直保持异步,因此事件处理程序也应该是异步的。第三,您可以对您的任务使用延续来更新UI或使用当前同步上下文

因此,您的代码应如下所示:

// store the sync context in the field of your form
SynchronizationContext syncContext = SynchronizationContext.Current;

// avoid the async void :)
public async Task AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()

// make event handler async - this is the only exception for the async void use rule from above
private async void Button_Click(object sender, RoutedEventArgs e)

// asynchronically wait the result without capturing the context
await Task.WhenAll(allTsk).ContinueWith(
  t => {
    // you can move out this logic to main method
    syncContext.Post(new SendOrPostCallback(o =>
        {
            TstRst.Text = "Completed..";
        }));
  }
);

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

为文件的读写操作实现File类

来自分类Dev

在Settings类中优化读写操作

来自分类Dev

无法解决操作系统中的内存管理问题

来自分类Dev

如何在Java中阻止读写和连接操作

来自分类Dev

如何在Cloud Firestore中优化读写操作?

来自分类Dev

C ++中的读写锁实现

来自分类Dev

在非IO环境中执行IO操作

来自分类Dev

Centos 7中的NIO和IO性能问题(第一次和第二次读写)

来自分类Dev

IO操作中的通过状态

来自分类Dev

IO操作中的通过状态

来自分类Dev

实现非成员 IO 操作符

来自分类Dev

使用Apache.POI读写Excel的Java内存问题

来自分类Dev

TPL Dataflow数据回收

来自分类Dev

理解操作系统和内存如何在 Unix 系统中工作的问题

来自分类Dev

是否可以在Android的同一活动中对NFC使用读写操作

来自分类Dev

使用Systemtap探测为Linux中的每个读写操作获取字节的虚拟地址位置

来自分类Dev

RTOS(实时操作系统)可以在文件系统中读写文件吗?

来自分类Dev

BoundedCapacity是否包括TPL Dataflow中当前正在处理的项目?

来自分类Dev

如何在TPL Dataflow中设置/获取/使用块的名称?

来自分类Dev

实现快速排序的内存问题

来自分类Dev

读写内存数据

来自分类Dev

C ++,读写内存,

来自分类Dev

内存读写字节

来自分类Dev

我在GUI中实现鼠标操作侦听器时遇到问题

来自分类Dev

使用TPL对网络中的所有计算机执行ping操作

来自分类Dev

使用TPL对网络中的所有计算机执行ping操作

来自分类Dev

如何在Haskell中“中断” IO操作

来自分类Dev

如何在Haskell中“中断” IO操作

来自分类Dev

TPL和内存管理