Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question regarding implementing an IPagedSourceProviderAsync provider #3

Open
sgdowney opened this issue Jun 18, 2015 · 12 comments
Open

Comments

@sgdowney
Copy link

I've implemented an IPagedSourceProviderAsync for reading large text files. The provider runs a task that reads the file to create pages of data to be used by the GetItemsAtAsync task. The provider periodically raises an event to update the total row count so that more records can be displayed. This event is captured by my view model which does a ResetAsync on the collection. Is there a better way to notify the collection that the provider's count property has been updated so that the collection can update itself?

@anagram4wander
Copy link
Owner

Yes, don't do it this way !

Use .add rather than reset if you can with something ( I think that's what you mean by paging) as a backing store. It works much faster that way, and p.s. .Add is thread safe.

Here is why - if the item is out of scope that you added, no events are raised from the observable, where as reset async always raises the most expensive event on the observable.

Sent from my iPhone

On Jun 18, 2015, at 8:24 AM, Stewart Downey [email protected] wrote:

I've implemented an IPagedSourceProviderAsync for reading large text files. The provider runs a task that reads the file to create pages of data to be used by the GetItemsAtAsync task. The provider periodically raises an event to update the total row count so that more records can be displayed. This event is captured by my view model which does a ResetAsync on the collection. Is there a better way to notify the collection that the provider's count property has been updated so that the collection can update itself?


Reply to this email directly or view it on GitHub.

@sgdowney
Copy link
Author

So I would use the provider GetItemsAtAsync call to retrieve the additional rows (could be 1000 or so) and use the .add or .addrange methods of the collection?

@anagram4wander
Copy link
Owner

Hum.. I have not tried that .. But, yes.

Sent from my iPhone

On Jun 18, 2015, at 9:22 AM, Stewart Downey [email protected] wrote:

So I would use the provider GetItemsAtAsync call to retrieve the additional rows (could be 1000 or so) and use the .add or .addrange methods of the collection?


Reply to this email directly or view it on GitHub.

@sgdowney
Copy link
Author

This is what I attempted:

PagedSourceItemsPacket<RecordItem> newItems;
int providerCount;
providerCount = textProvider.GetCountAsync().Result;
newItems = textProvider.GetItemsAtAsync(myData.Count, providerCount - myData.Count, true).Result;
myData.AddRange(newItems.Items);

The AddRange call ends up invoking a call to the GetItemsAt method on the provider. Since this is an async provider, GetItemsAt is not implemented.

@anagram4wander
Copy link
Owner

Hum.. You are right.. Let me think on that one.

On Jun 18, 2015, at 9:53 AM, Stewart Downey [email protected] wrote:

This is what I attempted:

PagedSourceItemsPacket newItems;
int providerCount;
providerCount = textProvider.GetCountAsync().Result;
newItems = textProvider.GetItemsAtAsync(myData.Count, providerCount - myData.Count, true).Result;
myData.AddRange(newItems.Items);

The AddRange call ends up invoking a call to the GetItemsAt method on the provider. Since this is an async provider, GetItemsAt is not implemented.


Reply to this email directly or view it on GitHub.

@sgdowney
Copy link
Author

I must be approaching this the wrong way.
I need to browse through rather large log files (sometimes up to 4 Gb). The provider I wrote uses a background task to read through the file and create a list of pages. Each page contains the file offset in bytes where the data begins, the size of the data chunk in bytes, the starting line number and the ending line number.
For example, the first page would have the above information for lines 1 - 150 of the file, then next page for lines 151 - 300, etc.
When the provider's GetItemsAtAsync method is called, I search through my internal page list to locate the pages that contain the records starting at the pageoffset row for count number of rows. I then open the file, seek to the position, read the data and return the appropriate rows.
Because it can take a few minutes to read through the entire file, I set the initial count after a couple of seconds of file reading. This allows the program to display the first part of the file. As the provider processes the file, I need to signal the collection to update because more data is available.
Ideally, the PaginationManager would be able to subscribe to a CountChanged event from the provider I've written, but I don't think it does at the moment.

@anagram4wander
Copy link
Owner

Let me mull on this over the weekend when I have some time.... The problem is that you don't know the size of the collection, until you get to the end, and you don't want the whole collection in memory so you can't just poke it into a List...

Interesting problem..

On Jun 18, 2015, at 10:13 AM, Stewart Downey [email protected] wrote:

I must be approaching this the wrong way.

I need to browse through rather large log files (sometimes up to 4 Gb). The provider I wrote uses a background task to read through the file and create a list of pages. Each page contains the file offset in bytes where the data begins, the size of the data chunk in bytes, the starting line number and the ending line number.

For example, the first page would have the above information for lines 1 - 150 of the file, then next page for lines 151 - 300, etc.
When the provider's GetItemsAtAsync method is called, I search through my internal page list to locate the pages that contain the records starting at the pageoffset row for count number of rows. I then open the file, seek to the position, read the data and return the appropriate rows.
Because it can take a few minutes to read through the entire file, I set the initial count after a couple of seconds of file reading. This allows the program to display the first part of the file. As the provider processes the file, I need to signal the collection to update because more data is available.
Ideally, the PaginationManager would be able to subscribe to a CountChanged event from the provider I've written, but I don't think it does at the moment.


Reply to this email directly or view it on GitHub.

@anagram4wander
Copy link
Owner

Could you post me the class for the provider that you are using ?

Sent from my iPhone

On Jun 18, 2015, at 10:13 AM, Stewart Downey [email protected] wrote:

I must be approaching this the wrong way.

I need to browse through rather large log files (sometimes up to 4 Gb). The provider I wrote uses a background task to read through the file and create a list of pages. Each page contains the file offset in bytes where the data begins, the size of the data chunk in bytes, the starting line number and the ending line number.

For example, the first page would have the above information for lines 1 - 150 of the file, then next page for lines 151 - 300, etc.
When the provider's GetItemsAtAsync method is called, I search through my internal page list to locate the pages that contain the records starting at the pageoffset row for count number of rows. I then open the file, seek to the position, read the data and return the appropriate rows.
Because it can take a few minutes to read through the entire file, I set the initial count after a couple of seconds of file reading. This allows the program to display the first part of the file. As the provider processes the file, I need to signal the collection to update because more data is available.
Ideally, the PaginationManager would be able to subscribe to a CountChanged event from the provider I've written, but I don't think it does at the moment.


Reply to this email directly or view it on GitHub.

@sgdowney
Copy link
Author

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Threading;
using AlphaChiTech.Virtualization;
using TextReader.Model;
using System.Collections.ObjectModel;
using System.ComponentModel;

namespace TextReader.Provider
{
    public class TextReaderProvider : IPagedSourceProviderAsync<RecordItem>
    {

        private static NLog.Logger classLogger = NLog.LogManager.GetCurrentClassLogger();

        private DispatcherTimer countTimer;
        private bool fileReadComplete;

        private string fileName;

        public TextReaderProvider(string textFileName)
        {
            masterLineCount = 0;
            pageList = new Collection<PageClass>();
            fileName = textFileName;
            fileReadComplete = false;
            Task<int> taskReadFile = Task<int>.Run(() => readStream());

            countTimer = new DispatcherTimer();
            countTimer.Interval = TimeSpan.FromSeconds(1);
            countTimer.Tick += countTimer_Tick;
            countTimer.Start();


        }





        void countTimer_Tick(object sender, EventArgs e)
        {
            countTimer.Stop();
            OnRaiseCountChanged(new EventArgs());
            countTimer.Interval = TimeSpan.FromSeconds(10);
            if (!fileReadComplete)
            {
                countTimer.Start();
            }
        }




        private int masterLineCount;
        public int Count
        {
            get
            {
                throw new NotImplementedException();
            }
        }

        public PagedSourceItemsPacket<RecordItem> GetItemsAt(int pageoffset, int count, bool usePlaceholder)
        {

            throw new NotImplementedException();

        }

        public int IndexOf(RecordItem item)
        {
            throw new NotImplementedException();
        }

        public void OnReset(int count)
        {
             //throw new NotImplementedException();
        }

        public Task<int> GetCountAsync()
        {
            return Task.Run(() =>
            {
                return masterLineCount;
            });
        }

        public Task<PagedSourceItemsPacket<RecordItem>> GetItemsAtAsync(int pageoffset, int count, bool usePlaceholder)
        {
            return Task.Run(() =>
            {
                PagedSourceItemsPacket<RecordItem> pageItems;
                pageItems = new PagedSourceItemsPacket<RecordItem>();
                pageItems.LoadedAt = DateTime.Now;
                pageItems.Items = SupplyPageOfData(pageoffset, count);
                return pageItems;
            });
        }

        public RecordItem GetPlaceHolder(int index, int page, int offset)
        {
            return new RecordItem() { DataElement = string.Format("Waiting [{0}/{1}]", page, offset) };
        }

        public Task<int> IndexOfAsync(RecordItem item)
        {
            return Task.Run(() => { return -1; });
        }




        public event EventHandler CountChanged;

        protected virtual void OnRaiseCountChanged( EventArgs e)
        {
            EventHandler handler = CountChanged;
            if (handler != null)
            {
                handler(this, e);
            }
        }


        private Collection<Model.PageClass> pageList;

        private List<RecordItem> SupplyPageOfData(int pageOffset, int count)
        {
            List<RecordItem> dataItems;
            IEnumerable<PageClass> pageItems;
            List<PageClass> myPages = null;
            int pageLowerBoundary = pageOffset + 1;
            int pageUpperBoundary = pageOffset + count;
            try
            {
                myPages = pageList.ToList();
                pageItems = from item in myPages where pageLowerBoundary <= item.UpperBound && pageUpperBoundary >= item.LowerBound select item;
                dataItems = FetchPage(pageItems, pageLowerBoundary, count);
            }
            catch (Exception ex)
            {
                int myPagesCount = 0;
                if (myPages != null)
                {
                    myPagesCount = myPages.Count;
                }
                classLogger.Error(ex, "pageOffset: {0}  count: {1}, myPages Count: {2}", pageOffset, count, myPagesCount);
                throw;
            }
            return dataItems;
        }

        private List<RecordItem> FetchPage(IEnumerable< PageClass> pageItems, int lowerBoundary, int count)
        {
            System.IO.FileStream sourceStream = null;
            UTF8Encoding enc;
            List<RecordItem> dataItems;
            byte[] fileBytes;
            int[] newLineIndexes;
            int newLineIndex;
            byte[] newLineBytes;
            int newLinePos;
            int startIndex=0;
            string data;
            int dataCount;
            Int64 dataIndex;
            int stringLength = 0;

            dataItems = new List<RecordItem>();
            fileBytes = new byte[0];
            try
            {
                sourceStream = new System.IO.FileStream(fileName, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.Read);
                enc = new UTF8Encoding();
                newLineBytes = enc.GetBytes(Environment.NewLine);
                dataCount = 0;

                foreach (var pageItem in pageItems)
                {
                    dataIndex = pageItem.LowerBound-1;
                    sourceStream.Position = pageItem.FileOffset;
                    fileBytes = new byte[pageItem.Capacity - 1];
                    sourceStream.Read(fileBytes, 0, fileBytes.Length);
                    newLineIndexes = fileBytes.SelectMany((s, i) => { if (s == newLineBytes[0]) { return new int[1] { i }; } else { return new int[0] { }; } }).ToArray();
                    startIndex = 0;
                    newLineIndex = 0;
                    while (newLineIndex < newLineIndexes.Length)
                    {
                        dataIndex += 1;
                        newLinePos = newLineIndexes[newLineIndex];
                        if (dataIndex >= lowerBoundary)
                        {
                            dataCount += 1;
                            if (dataCount <= count)
                            {
                                stringLength = newLinePos - startIndex;// +2;
                                data = enc.GetString(fileBytes, startIndex, stringLength);
                                dataItems.Add(new RecordItem() { DataIndex = dataIndex, DataElement = data });
                            }
                            else
                            {
                                newLineIndex = newLineIndexes.Length;
                            }
                        }
                        startIndex = newLinePos + 2;
                        newLineIndex += 1;
                    }
                }
            }
            catch (Exception ex)
            {
                classLogger.Error(ex, "fileBytes Len: {0}  startIndex: {1}  stringLength: {2}", fileBytes.Length, startIndex, stringLength);
                throw;
            }
            finally
            {
                if (sourceStream != null)
                {
                    sourceStream.Close();
                }
            }
            return dataItems;
        }


        private int readStream()
        {
            System.IO.FileStream sourceStream = null;
            byte[] fileBuffer;
            int bytesRead;
            System.Text.UTF8Encoding enc;
            int lineCount;
            int pageSize = 150;
            PageClass pageItem;
            int[] newLineIndexes;
            int newLineIndexSelector;
            Int64 fileRepositionAmount;
            byte[] newLineBytes;

            try
            {
                enc = new UTF8Encoding();
                newLineBytes = enc.GetBytes(Environment.NewLine);
                masterLineCount = 0;
                sourceStream = new System.IO.FileStream(fileName, System.IO.FileMode.Open, System.IO.FileAccess.Read, System.IO.FileShare.Read);
                pageItem = new PageClass();
                pageItem.FileOffset = sourceStream.Position;
                pageItem.LowerBound = 1;
                lineCount = 0;
                fileBuffer = new byte[4095];
                bytesRead = sourceStream.Read(fileBuffer, 0, fileBuffer.Length);
                while (bytesRead > 0)
                {
                    newLineIndexes = fileBuffer.SelectMany((s, i) => { if (s == newLineBytes[0]) { return new int[1] { i }; } else { return new int[0] { }; } }).ToArray();
                    lineCount += newLineIndexes.Length;
                    if (lineCount >= pageSize)
                    {
                        masterLineCount += pageSize;
                        newLineIndexSelector = newLineIndexes.Length - 1 - (lineCount - pageSize);
                        fileRepositionAmount = bytesRead - (newLineIndexes[newLineIndexSelector] + 2);
                        sourceStream.Position -= fileRepositionAmount;
                        pageItem.Capacity = sourceStream.Position - pageItem.FileOffset;
                        pageItem.UpperBound = pageItem.LowerBound + pageSize - 1;
                        pageList.Add(pageItem);
                        pageItem = new PageClass();
                        pageItem.LowerBound = masterLineCount + 1;
                        pageItem.FileOffset = sourceStream.Position;
                        lineCount = 0;
                    }
                    Array.Clear(fileBuffer, 0, fileBuffer.Length);
                    bytesRead = sourceStream.Read(fileBuffer, 0, fileBuffer.Length);
                }
                if (lineCount > 0)
                {
                    masterLineCount += lineCount;
                    pageItem.Capacity = sourceStream.Position - pageItem.FileOffset;
                    pageItem.UpperBound = pageItem.LowerBound + lineCount - 1;
                    pageList.Add(pageItem);
                }
            }
            catch (Exception ex)
            {
                classLogger.Error(ex);
                throw;
            }
            finally
            {
                if (sourceStream != null)
                {
                    sourceStream.Close();
                }
                fileReadComplete = true;
            }
            return masterLineCount;
        }

    }
}

@sgdowney
Copy link
Author

sgdowney commented Jul 6, 2015

While calling ResetAsync() on the collection may not be the most efficient method, it has been effective for when the line count is updated. From a performance perspective it doesn't seem to affection the functioning of the application when the resets occur.

@pluskal
Copy link
Contributor

pluskal commented Aug 13, 2015

Hi! Have you solved the issue with notification subscription in PaginationManager? I have a similar use-case that would really benefit from this functionality. I have a very large collection of models that I need to display (lets say around a million) , it is very noneffective to create VM for every model in collection so I wanted to use VirtualizionObservableCollection and its IPagedSourceProvider to create VMs only on demand when the UI wants to display them.

@sgdowney
Copy link
Author

My UI app listens for the CountChanged event that is raised by my provider. On the CountChanged event I issue a ResetAsync() on the collection. For my scenario that works fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants