Multithreading

Dado un número de threads -al menos 3- deben realizar un trabajo hasta recibir una señal para que se detenga su ejecución.

Al inicio todos los hilos de ejecución están en modo de espera y solo cuando reciben una determinada señal imprimen información, esta señal será emitida cada segundo, al azar a un thread a la vez.

using System;
using System.Threading;
using System.Collections.Generic;

namespace Module02
{
    public class MyThread
    {
        public int ID { get; set; }
        public bool FLAG { get { return mRES.IsSet; } }
        public ManualResetEventSlim mRES { get; set; }
    }

    public class Program1
    {
        private static List<Thread> threads;
        private static List<ManualResetEventSlim> listMRES;

        [MTAThread]
        public static void Main()
        {
             int nThreads;
             Random rand = new Random();
             Console.CancelKeyPress += new ConsoleCancelEventHandler(Console_CancelKeyPress);

             do
             {
                 Console.WriteLine("Enter the number of threads (n > 3)");

                 if (Int32.TryParse(Console.ReadLine(), out nThreads) && nThreads > 3)
                 {
                     threads = new List<Thread>(nThreads);
                     listMRES = new List<ManualResetEventSlim>(nThreads);

                     for (int i = 1; i <= nThreads; i++)
                     {
                         listMRES.Add(new ManualResetEventSlim(false)); //unsignaled initialized
                        
                         MyThread threadData = new MyThread
                         {
                             ID = i,
                             mRES = listMRES[i - 1]
                         };

                         Thread t = new Thread(new ParameterizedThreadStart(ThreadProc));
                         t.Start(threadData);
                         threads.Add(t);
                     }
                    
                     // release one thread each second until all threads have been released
                     while (true)
                     {
                         listMRES[rand.Next(0, nThreads)].Set();
                         Thread.Sleep(1000);
                     }
                 }
                 else
                 {
                     nThreads = 0;
                 }
            } while (nThreads > 3);
        }
     
        static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            if (e.SpecialKey == ConsoleSpecialKey.ControlC)
            {
                foreach (Thread current in threads)
                {
                    current.Interrupt();
                    current.Join();
                }
            }
        }

        public static void ThreadProc(object data)
        {
            var currentData = (MyThread) data;

            try
            {
                while (true)
                {
                    while (currentData.FLAG)
                    {
                        Console.WriteLine("Thread ID: {0}", currentData.ID);
                        Thread.Sleep(1000);
                    }
                    currentData.mRES.Wait();
                }
            }
            catch (ThreadInterruptedException) { return; }
        }
    }
}

Encontrar los índices de los elementos mínimos y máximos dentro de un arreglo. Se da la dimensión del arreglo al inicio del programa y la busqueda es separada en hilos de ejecución, al final se muestra los resultados y el tiempo de ejecución.


using System;
using System.Linq;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace Module02
{
    public class Program2
    {    
        private static int n = 20000000;
        private static int[] a = new int[n];

        private static List < int > mins = new List < int > ();
        private static List < int > maxs = new List < int > ();

        private static List < Task > tasks = new List < Task > ();
        private static Stopwatch stopwatch = new Stopwatch();

        [MTAThread]
        public static void Main()
        {
            Random rand = new Random();

            /*
            do
            {
                Console.WriteLine("Enter the size of the array");

                if (!Int32.TryParse(Console.ReadLine(), out n))
                {
                    n = 0;
                }
            } while (n < 3);
            */

            Task t1 = new Task(() => {
                for (int i = 0; i < n; i++)
                {
                    a[i] = rand.Next();
                }
            });

            stopwatch.Reset();
            stopwatch.Start();

            t1.Start();
            t1.Wait();

            stopwatch.Stop();

            Console.WriteLine($"Array filled! in {stopwatch.ElapsedMilliseconds} ms");
 
            startWithTasks(2);
            startWithTasks(4);
            startWithTasks(8);
            startWithTasks(16);
 
            /*
            for (int i = 1; i <= 8; i++)
            {
                int temp = i;
                int val = i * n / 8;
                int fromVal = (i - 1) * n / 8;

                Console.WriteLine($"from: {fromVal} to: {val}");
                Task tMin = new Task(() =>
                {
                    int tmp = a[0];

                    for (int j = fromVal; j < val; j++)
                    {
                        if (tmp > a[j]) tmp = a[j];
                    }
                    mins.Add(tmp);
                });
 
                tMin.Start();
                Task tMax = new Task(() =>
                {
                    int tmp = a[0];

                    for (int k = fromVal; k < val; k++)
                    {
                        if (tmp < a[k]) tmp = a[k];
                    }

                    maxs.Add(tmp);
                });

                tMax.Start();
                tasks.Add(tMin);
                tasks.Add(tMax);
            }

            Task.WaitAll(tasks.ToArray());
            Console.WriteLine($"Min: {mins.Min()} found!");
            Console.WriteLine($"Max: {maxs.Max()} found!");
            */
        }

        static void startWithTasks(int m)
        {
            stopwatch.Reset();
            stopwatch.Start();
 
            for (int i = 1; i <= m; i++)
            {
                int temp = i;
                int val = i * n / m;
                int fromVal = (i - 1) * n / m;

                Task tMin = new Task(() => {
                    int tmp = a[0];

                    for (int j = fromVal; j < val; j++)
                    {
                        if (tmp > a[j]) tmp = a[j];
                    }
                    mins.Add(tmp);
                });

                tMin.Start();

                Task tMax = new Task(() => {
                    int tmp = a[0];

                    for (int k = fromVal; k < val; k++)
                    {
                        if (tmp < a[k]) tmp = a[k];
                    }
                    maxs.Add(tmp);
                });

                tMax.Start();
                tasks.Add(tMin);
                tasks.Add(tMax);
            }

            Task.WaitAll(tasks.ToArray());
            stopwatch.Stop();

            Console.WriteLine($"Min: {mins.Min()} found!");
            Console.WriteLine($"Max: {maxs.Max()} found!");
            Console.WriteLine($"With {m} threads in {stopwatch.ElapsedMilliseconds} ms");

            mins.Clear();
            maxs.Clear();
            tasks.Clear();
        }
    }
}

El programa genera dos matrices con números enteros al azar, y procede a calcular la multiplicación de las mismas. La multiplicación de las matrices se las realiza en paralelo en distintos hilos de ejecución.


using System;
using System.Threading;
using System.Collections.Generic;

namespace Module02
{
    public class Program3
    {
         private static int n = 50;
         private static Random rand = new Random();
         private static int[,] a;
         private static int[,] b;
         private static int[,] result;

        [MTAThread]
        public static void Main()
        {
            do
            {
                Console.WriteLine("Enter the size of the matrix (>= 50)");

                if (!Int32.TryParse(Console.ReadLine(), out n))
                {
                    n = 0;
                }
            } while (n < 49);

            a = new int[n, n];

            Thread t1 = new Thread(() =>
            {
                for (int i = 0; i < n / 2; i++)
                {
                    for (int j = 0; j < n; j++)
                    {
                        a[i, j] = rand.Next();
                        Thread.Yield();
                    }
                }
            });

            t1.Start();

            Thread t2 = new Thread(() =>
            {
                for (int i = n / 2; i < n; i++)
                {
                    for (int j = 0; j < n; j++)
                    {
                        a[i, j] = rand.Next();
                        Thread.Yield();
                    }
                }
            });

            t2.Start();

            b = new int[n, n];

            Thread t3 = new Thread(() =>
            {
                for (int i = 0; i < n / 2; i++)
                {
                    for (int j = 0; j < n; j++)
                    {
                        b[i, j] = rand.Next();
                        Thread.Yield();
                    }
                }
            });

            t3.Start();

            Thread t4 = new Thread(() =>
            {
                for (int i = n / 2; i < n; i++)
                {
                    for (int j = 0; j < n; j++)
                    {
                        b[i, j] = rand.Next();
                        Thread.Yield();
                    }
                }
            });

            t4.Start();

            t1.Join();
            Console.WriteLine($"T1 finished!");
            t2.Join();
            Console.WriteLine($"T2 finished!");
            t3.Join();
            Console.WriteLine($"T3 finished!");
            t4.Join();

            Console.WriteLine($"T4 finished!");
            Console.WriteLine($"A {a.Length} elements");
            Console.WriteLine($"B {b.Length} elements");

            /*
            a[0, 0] = 10;
            a[0, 1] = 20;
            a[0, 2] = 10;

            a[1, 0] = 4;
            a[1, 1] = 5;
            a[1, 2] = 6;

            a[2, 0] = 2;
            a[2, 1] = 3;
            a[2, 2] = 5;

            b[0, 0] = 3;
            b[0, 1] = 2;
            b[0, 2] = 4;

            b[1, 0] = 3;
            b[1, 1] = 3;
            b[1, 2] = 9;

            b[2, 0] = 4;
            b[2, 1] = 4;
            b[2, 2] = 2;
            */


            // 130 120 240
            //  51  47  73
            //  35  33  45

            result = new int[n, n];
            List<Thread> threads = new List<Thread>();

            for (int i = 0; i < n * n; i++)
            {
                int temp = i;

                Thread thread = new Thread(() =>
                {
                    int i = temp / n;
                    int j = temp % n;

                    int[] x = GetRow(a, i);
                    int[] y = GetColumn(b, j);

                    for (int k = 0; k < x.Length; k++)
                    {
                        result[i, j] += x[k] * y[k];
                    }

                    //Console.WriteLine("Element [{0}, {1}]", i, j);
                });

                thread.Start();
                threads.Add(thread);
            }

            foreach (Thread t in threads)
            {
                t.Join();
            }

            Console.WriteLine("Matrix multiplication complete!");
        }

        static int[] GetColumn(int[,] arr, int i)
        {
            int[] res = new int[n];
            
            for (int j = 0; j < n; j++)
            {
                res[j] = arr[j, i];
            }
            return res;
        }

        static int[] GetRow(int[,] arr, int i)
        {
            int[] res = new int[n];

            for (int j = 0; j < n; j++)
            {
                res[j] = arr[i, j];
            }
            return res;
        }
    }
}

El programa realiza lo siguiente:

  1. Un thread emite números enteros y los ingresa en un queue, si el queue está lleno el thread “espera”
  2. Otros threads consumen los números del queue y pasan a un modo de espera
  3. El thread principal imprime la información del queue y de los diferentes threads y sus estados

using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;

namespace Module02
{
    public class ConsumerThread
    {
        public int ID { get; set; }
        public bool IS_BUSY { get; set; }
    }

    public class ProducerThread
    {
        public int ID { get; set; }
        public int QUEUE_SIZE { get; set; }
    }

    public class Program4
    {
        private static ConcurrentQueue<int> queue;
        private static List<Thread> threads;
        private static Thread producerThread;

        private static Random rand = new Random();

        [MTAThread]
        public static void Main()
        {
            int consumers;
            int queueSize;
            Console.CancelKeyPress += new ConsoleCancelEventHandler(Console_CancelKeyPress);
 
            do
            {
                Console.WriteLine("Enter the number of consumers (n > 0)");

                if (Int32.TryParse(Console.ReadLine(), out consumers) && consumers > 0)
                {
                    threads = new List<Thread>(consumers);

                    do
                    {
                        Console.WriteLine("Enter the size of the queue (n > 4)");

                        if (Int32.TryParse(Console.ReadLine(), out queueSize) && queueSize > 4)
                        {
                            queue = new ConcurrentQueue<int>();

                            for (int i = 0; i < queueSize; i++)
                            {
                                queue.Enqueue(rand.Next());
                            }

                            ProducerThread producerData = new ProducerThread
                            {
                                ID = 1,
                                QUEUE_SIZE = queueSize
                            };

                            producerThread = new Thread(new ParameterizedThreadStart(ProducerThreadProc));
                            producerThread.Start(producerData);

                            for (int i = 1; i <= consumers; i++)
                            {
                                ConsumerThread threadData = new ConsumerThread
                                {
                                    ID = i,
                                    IS_BUSY = false
                                };

                                Thread t = new Thread(new ParameterizedThreadStart(ConsumerThreadProc));
                                t.Start(threadData);
                                threads.Add(t);
                            }

                            while (true)
                            {
                                Console.WriteLine("Current Queue size: {0}", queue.Count);

                                Console.WriteLine("State of Consumers");

                                for (int t = 1; t <= threads.Count; t++)
                                {
                                    Console.WriteLine("Consumer : {0} -> {1}", t, threads[t - 1].ThreadState);
                                }

                                Thread.Sleep(1000);
                            }
                        }
                        else
                        {
                            queueSize = 0;
                        }
                    } while (queueSize > 4);
                }
                else
                {
                    consumers = 0;
                }
            } while (consumers > 0);
        }

        static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            if (e.SpecialKey == ConsoleSpecialKey.ControlC)
            {
                producerThread.Interrupt();

                foreach (Thread current in threads)
                {
                    current.Interrupt();
                }
            }
        }

        public static void ProducerThreadProc(object data)
        {
            var currentData = (ProducerThread)data;
 
            try
            {
                while (true)
                {
                    if (queue.Count < currentData.QUEUE_SIZE)
                    {
                        queue.Enqueue(rand.Next());
                    }

                    Thread.Sleep(1000);
                    Thread.Yield();
                }
            }
            catch (ThreadInterruptedException) { return; }
        }

        public static void ConsumerThreadProc(object data) 
        {
            try
            {
                while (true)
                {
                    if (queue.Count > 0)
                    {
                        if (queue.TryDequeue(out int m))
                        {
                            Thread.Sleep(2000);
                        }
                    }
                    Thread.Yield();
                }
            }
            catch (ThreadInterruptedException) { return; }
        }
    }
}

Un thread genera una tupla de enteros representando lo siguiente:

  • Tipo de tarea
  • Prioridad
  • Número para el queue

El thread añade los valores al queue, y si este está lleno el hilo de ejecución pasa a un estado de espera.


using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;

namespace Module02
{
    struct GroupThread
    {
        public int ID { get; set; }
        public List<Thread> threadsList{ get; set; }
    }

    public class Program5
    {
        private static int groups;
        private static int queueSize;
        private static List<GroupThread> threads;
        private static Thread producerThread;

        private static Mutex mutex = new Mutex();
        private static Random rand = new Random();

        private static ConcurrentQueue<Tuple<int, int, int>> queue;

 

        [MTAThread]
        public static void Main()
        {
            int consumers;

            Console.CancelKeyPress += new ConsoleCancelEventHandler(Console_CancelKeyPress);

            do
            {
                Console.WriteLine("Enter the number of groups (n > 0)");

                if (Int32.TryParse(Console.ReadLine(), out groups) && groups > 0)
                {
                    do
                    {
                        Console.WriteLine("Enter the max number of consumers for each group (n > 0)");

                        if (Int32.TryParse(Console.ReadLine(), out consumers) && consumers > 0)
                        {
                            do
                            {
                                Console.WriteLine("Enter the size of the queue (n > 4)");

                                if (Int32.TryParse(Console.ReadLine(), out queueSize) && queueSize > 4)
                                {
                                    threads = new List<GroupThread>(groups);
                                    queue = new ConcurrentQueue<Tuple<int, int, int>>();

                                    for (int i = 0; i < queueSize; i++)
                                    {
                                        queue.Enqueue(Tuple.Create(rand.Next(0, groups), rand.Next(), rand.Next()));
                                    }

                                    producerThread = new Thread(new ThreadStart(ProducerThreadProc));
                                    producerThread.Start();

                                    for (int i = 1; i <= groups; i++)
                                    {
                                        GroupThread groupThread = new GroupThread
                                        {
                                            ID = i,

                                            threadsList = new List<Thread>(rand.Next(1, consumers))
                                        };

                                        for (int j = 0; j < groupThread.threadsList.Capacity; j++)
                                        {
                                            Thread ct = new Thread(new ParameterizedThreadStart(ConsumerThreadProc));

                                            ct.Start(groupThread);
                                            groupThread.threadsList.Add(ct);
                                        }
                                        threads.Add(groupThread);
                                    }

                                    while (true)
                                    {
                                        Console.WriteLine("Current Queue size: {0}", queue.Count);

                                        Console.WriteLine("State of Groups");

                                        for (int i = 0; i < threads.Count; i++)
                                        {
                                            Console.WriteLine("Group: {0}", i + 1);

                                            for (int j = 1; j <= threads[i].threadsList.Count; j++)
                                            {
                                                Console.WriteLine("- {0} -> {1}", j, threads[i].threadsList[j - 1].ThreadState);
                                            }

                                            Console.WriteLine();
                                        }

                                        Thread.Sleep(1000);
                                    }
                                }
                                else
                                {
                                    queueSize = 0;
                                }
                            } while (queueSize > 4);
                        }
                        else
                        {
                            consumers = 0;
                        }
                    } while (consumers > 0);
                }
                else
                {
                    groups = 0;
                }
            } while (groups > 0);
        }

        static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            if (e.SpecialKey == ConsoleSpecialKey.ControlC)
            {
                producerThread.Interrupt();

                foreach (GroupThread currentGroup in threads)
                {
                    foreach (Thread current in currentGroup.threadsList)
                    {
                        current.Interrupt();
                    }
                }
                //mutex.Dispose();
            }
        }

        public static void ProducerThreadProc()
        {
            try
            {
                while (true)
                {
                    if (queue.Count < queueSize)
                    {
                        queue.Enqueue(Tuple.Create(rand.Next(0, groups), rand.Next(), rand.Next()));
                    }

                    Thread.Sleep(4000);
                    Thread.Yield();
                }
            }
            catch (ThreadInterruptedException) { return; }
        }
 
        public static void ConsumerThreadProc(object data)
        {
            var currentData = (GroupThread)data;

            try
            {
                while (true)
                {
                    if (queue.Count > 0)
                    {
                        if (mutex.WaitOne())
                        {
                            if (queue.TryPeek(out Tuple<int, int, int> m) && m.Item1 == currentData.ID)
                            {
                                if (queue.TryDequeue(out Tuple<int, int, int> n))
                                {
                                    mutex.ReleaseMutex();

                                    Thread.Sleep(500);
                                }
                            }
                        }
                    }
                    Thread.Yield();
                }
            }
            catch (ThreadInterruptedException) { return; }

        }
    }
}