Que construção eu uso para garantir que 100 tarefas sejam executadas em paralelo?

5

Eu estava tentando criar um teste de integração para meu serviço em que 100 clientes se conectariam, efetuariam login, enviam solicitações e registram todas as respostas por um período de tempo configurável.

Eu estava construindo uma classe para o cliente usando soquetes assíncronos e funciona bem. Eu iniciei tudo usando Task e Task.Factory, enviei o login, e postou recebe toda vez que recebi dados, até o tempo expirar e então eu liguei para desligá-los.

Parece que isso nunca acontece em paralelo. Às vezes eu consigo correr de uma vez só, às vezes um pouco mais. Eu suponho que o agendador de tarefas esteja executando-os quando parece adequado e não de uma vez.

Agora entendo que não posso realmente ter 100 threads em execução simultânea, mas quero garantir que todos os 100 sejam iniciados e que o sistema operacional esteja alternando o contexto para frente e para trás tentando executá-los todos.

No final, quero simular um grande número de clientes conectados ao meu serviço, todos recebendo um fluxo de dados.

Qual construção eu uso se a Tarefa não funciona?

Tentativa atual:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace IntegrationTests
{
    class Program
    {
        static void Main(string[] args)
        {
            string       server            = ConfigurationManager.AppSettings["server"];
            int          port              = int.Parse(ConfigurationManager.AppSettings["port"]);
            int          numClients        = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
            TimeSpan     clientLifetime    = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
            TimeSpan     timeout           = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
            TimeSpan     reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
            List<string> clientIds         = ConfigurationManager.GetSection("clientIds") as List<string>;

            try
            {
                // SNIP configure logging

                // Create the specified number of clients, to carry out test operations, each on their own threads
                Task[] tasks = new Task[numClients];
                for(int count = 0; count < numClients; ++count)
                {
                    var index = count;
                    tasks[count] = Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                // Reuse client Ids, if there are more clients then clientIds.
                                // Keep in mind that tasks are not necessarily started in the order they were created in this loop.
                                // We may see client id 1 be assigned client id 2 if another client was started before it, but we
                                // are using all clientIds
                                string clientId = null;
                                if (numClients < clientIds.Count)
                                {
                                    clientId = clientIds[index];
                                }
                                else
                                {
                                    clientId = clientIds[index % clientIds.Count];
                                }

                                // Create the actual client
                                Client client = new Client(server, port, clientId, timeout, reconnectInterval);
                                client.Startup();

                                // Will make an sync request issue a recv.
                                // Everytime we get a reponse, it will be logged and another recv will be posted.
                                // This will continue until shutdown is called
                                client.MakeRequest(symbol);

                                System.Threading.Thread.Sleep(clientLifetime);

                                client.Shutdown();
                            }
                            catch(Exception e)
                            {
                                // SNIP - Log it
                            }
                        });
                }
                Task.WaitAll(tasks);
            }
            catch (Exception e)
            {
                // SNIP - Log it
            }
        }
    }
}
    
por Christopher Pisz 15.12.2017 / 21:36
fonte

2 respostas

6

Tarefas e Threads existem para finalidades diferentes. As tarefas destinam-se a executar pequenas coisas que precisam ser executadas em segundo plano. Threads representam um recurso operacional para execução simultânea.

Internamente, o TaskManager usa um pool de threads para que ele possa reutilizar os threads para processar mais tarefas. Threads são caros para configurar e derrubar para que eles não funcionem bem para o propósito que as Tarefas foram criadas. Embora você possa influenciar o número de encadeamentos disponíveis para o gerenciador de tarefas, ele ainda é responsável por distribuir o trabalho para os encadeamentos.

Garantindo o número X de clientes simultâneos

A única maneira de garantir isso é usar Thread em vez de Task . Se você fosse reestruturar seu código um pouco, você poderia lidar com seus clientes simultâneos assim:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading;

namespace IntegrationTests
{
    private static string server;
    private static int port;
    private static TimeSpan clientLifetime;
    private static TimeSpan timeout;
    private static TimeSpan reconnectInterval;
    private static List<string> clientIds;
    private static Barrier barrier;

    class Program
    {
        static void Main(string[] args)
        {
            int          numClients        = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
            server            = ConfigurationManager.AppSettings["server"];
            port              = int.Parse(ConfigurationManager.AppSettings["port"]);
            clientLifetime    = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
            timeout           = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
            reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
            clientIds         = ConfigurationManager.GetSection("clientIds") as List<string>;
            barrier           = new Barrier(numClients + 1);

            try
            {
                // SNIP configure logging

                // Create the specified number of clients, to carry out test operations, each on their own threads
                Thread[] threads= new Thread[numClients];
                for(int count = 0; count < numClients; ++count)
                {
                    var index = count;
                    threads[count] = new Thread();
                    threads[count].Name = $"Client {count}"; // for debugging
                    threads[count].Start(RunClient);
                }

                // We loose the convenience of awaiting all tasks,
                // but use a thread barrier to block this thread until all the others are done.
                barrier.SignalAndWait();
            }
            catch (Exception e)
            {
                // SNIP - Log it
            }
        }

        private void RunClient()
        {
            try
            {
                // Reuse client Ids, if there are more clients then clientIds.
                // Keep in mind that tasks are not necessarily started in the order they were created in this loop.
                // We may see client id 1 be assigned client id 2 if another client was started before it, but we
                // are using all clientIds
                string clientId = null;
                if (numClients < clientIds.Count)
                {
                    clientId = clientIds[index];
                }
                else
                {
                    clientId = clientIds[index % clientIds.Count];
                }

                // Create the actual client
                Client client = new Client(server, port, clientId, timeout, reconnectInterval);
                client.Startup();

                // Will make an sync request issue a recv.
                // Everytime we get a reponse, it will be logged and another recv will be posted.
                // This will continue until shutdown is called
                client.MakeRequest(symbol);

                System.Threading.Thread.Sleep(clientLifetime);

                client.Shutdown();
            }
            catch(Exception e)
            {
                // SNIP - Log it
            }
            finally
            {
                barrier.SignalAndWait();
            }
        }
    }
}
    
por 15.12.2017 / 23:22
fonte
1

Eu não acho que haja um problema com seu teste.

Eu usei código semelhante para testes de carga (básicos) e vi mais de 100 tarefas simultâneas.

Eu acho que há um problema com a maneira como você está registrando. Você está simplesmente medindo o número de conexões simultâneas que seu servidor pode manipular?

Por exemplo, o código abaixo contará até 1.000.

No entanto, observe a diferença se substituirmos Task.Delay por Thread.Sleep. Isso quebra o aplicativo porque mais de uma tarefa é executada no mesmo segmento.

Agora, se nós também mudarmos a tarefa.Adicione a:

tasks.Add(Task.Factory.StartNew(async () => Work(),TaskCreationOptions.LongRunning));

O código funciona novamente, pois sabe criar novas tarefas em novos tópicos

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Generic;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        volatile int count = 0;
        [TestMethod]
        public async Task TestMethod1()
        {
            var tasks = new List<Task>();
            for(int i = 0;i<1000;i++)
            {
                tasks.Add(Work());
            }
            await Task.WhenAll(tasks.ToArray());
            Debug.WriteLine("finished");
        }

        async Task Work()
        {
            count++;
            Debug.WriteLine(count);
            await Task.Delay(10000);
            Debug.WriteLine(count);
            count--;
        }
    }
}
    
por 16.12.2017 / 11:30
fonte