Pranay Rana

Monday, November 2, 2020

Work Partitioning - Process large amount of records in C#

What is Work Partitioning ?

Work Partitioning means dividing big/large amount of work between workers. Which means if you have work which is going to take 100hrs to complete by one worker you divide it among multiple worker lets between 10 workers who work parallel , then work of 100hrs get complete in 10hrs. So work partitioning helps to complete works faster.

Once can apply work partitioning to process data faster by dividing work among set of machines (like in big data where large amount high volume data processed by set of machines located at on premise or at in cloud) or process incoming request by set of machines with help of Load Balancers. But in this post I am going to put information about partitioning work in Process/Application using Task Parallel library of C# language.

Below is code that make use of C# task parallel library and does the processing of lots of records, here is 1m records.

public List<TransFormedCustomer> GetProcessData()
{
    var processRecordCount = 10000;
    var retModels = new List<TransFormedCustomer>();
    var lotOfRecords = GetFakeData(); //example million records from datasource like database, web service, csv file

    int tasksCount = (lotOfRecords.Count / processRecordCount);
    if (lotOfRecords.Count % processRecordCount > 0)
        tasksCount++;

    var tasks = new Task<List<TransFormedCustomer>>[tasksCount];
    for (int i = 0; i < tasksCount; i++)
    {
        IEnumerable<Customer> datas;
        int skip = (processRecordCount * i);
        int take = processRecordCount;

        if (i == 0)
            datas = lotOfRecords.Take(take);
        else
            datas = lotOfRecords.Skip(skip).Take(take);

        tasks[i] = Task.Factory.StartNew((obj) => GetProccessedData(datas), Tuple.Create(skip, take));
#if DEBUG
        if (i == tasksCount - 1 && lotOfRecords.Count % 10000 != 0)
           take = lotOfRecords.Count % 10000;
        Console.WriteLine($"task no {i} started, reading from {skip} to {skip + take}");
#endif
    }
    try
    {
        Task.WaitAll(tasks);
    }
    catch
    {
        //left empty as logging errors in below foreach block
    }

    foreach (var task in tasks)
    {
        if (task.IsCanceled || task.IsFaulted)
        {
            var state = (Tuple<int, int>)task.AsyncState;
            Console.WriteLine($"taks failed to process data from {state.Item1} to {state.Item2}");
        }
        else
            retModels.AddRange(task.Result);
     }

    return retModels;
} 
Code works above works as follows
  1. It sets number number of records one task can process , here in code its 10000 set in "processRecordCount" variable. 
  2. Code get 1m records from the datasoruce, here "GetFake()" method(code listed at the end) returns 1m customers records 
  3. "taskCount" variable stores number of task need to process received records ("GetProccessedData(datas)"- method listed below) i.e. here its 100000/1000 = 10 number of task required to process records.
  4. In for loop with help of "Take" or "Skip" , each task get 10000 to process. 
  5. "Task.WaitAll()" wait for all task to finish work
  6. After work get finished by each task, processed records get added to "retModels" collection.
  7. If task to fail to process record then it logs information on console. 

Output

As shown in above image each thread does processing of 10000 records and each pick different set of records. 

Followings are advantages of work partitioning
  1. Very huge amount of data get processed faster as it gets process parallelly with help of tasks i.e. workers\
  2. It make use of multicore CPU i.e. computing efficiently as one task get processed by one core mostly   
Code used to support above function   

GetFakeData- method

Fake data produced by using Faker.Net "install-package Faker.Net"

private List<Customer> GetFakeData()
{
    var customers = new List<Customer>();
    for (int i = 0; i < 100000; i++)
    {
        customers.Add(
            new Customer()
            {
                FirstName = Faker.Name.First(),
                LastName = Faker.Name.Last(),
                EmailAddress = Faker.Internet.Email(),
                SSN = Faker.Identification.SocialSecurityNumber()
            }
            );
    }
    return customers;
}
GetProccessedData-Method
private List<TransFormedCustomer> GetProccessedData(IEnumerable<Customer> customers)
{
    var transFormedCustomers = new List<TransFormedCustomer>();
    foreach (var customer in customers)
    {
        //do processing on recevied data 
        //enrich data and add it to process collection 
        string formattedSSN = customer.SSN.Insert(5, "-").Insert(3, "-");
        string formattedName = customer.FirstName + ' ' + customer.LastName;
        string email = customer.EmailAddress;
        Regex regex = new Regex(@"^([\w\.\-]+)@([\w\-]+)((\.(\w){2,3})+)$");
        Match match = regex.Match(email);
        if (!match.Success)
            email = "dummy@dummy.com";
        var transFormedCustomer = new TransFormedCustomer()
        {
            Name = formattedName,
            SSN = formattedSSN,
            EmailAddress = email
        };

        transFormedCustomers.Add(transFormedCustomer);
    }
    return transFormedCustomers;
}
Customer & TransformCustomer Modal class
    public class Customer
    {
        public string EmailAddress { get; set; }
        public string FirstName { get; set; }
        public string LastName { get; set; }
        public string SSN { get; set; }
    }

    public class TransFormedCustomer
    {
        public string EmailAddress { get; set; }
        public string Name { get; set; }
        public string SSN { get; set; }
    }