5
votes

Optimisation du temps des tâches parallèles de longue durée

Intro

Je travaille avec une bibliothèque externe complexe, où j'essaye d'exécuter ses fonctionnalités sur une grande liste d'éléments. La bibliothèque n'expose pas une bonne interface asynchrone, je suis donc coincé avec du code assez démodé.

Mon objectif est d'optimiser le temps nécessaire pour terminer un lot de traitement et de démontrer le problème sans avoir à inclure la bibliothèque tierce réelle. J'ai créé une approximation du problème ci-dessous

Problème

Étant donné une action non asynchrone, où vous pouvez connaître la "taille" (c'est-à-dire la complexité) de l'action à l'avance:

class Program
{
    static void Main(string[] args)
    {
        MainAsync().GetAwaiter().GetResult();
        Console.ReadLine();
    }

    static async Task MainAsync()
    {
        var list = new List<IAction>();
        for (var i = 0; i < 200; i++) list.Add(new LongAction());
        for (var i = 0; i < 200; i++) list.Add(new MediumAction());
        for (var i = 0; i < 200; i++) list.Add(new ShortAction());


        var swSync = Stopwatch.StartNew();
        Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = 20 }, action =>
        {
            Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Starting action {action.GetType().Name} on thread {Thread.CurrentThread.ManagedThreadId}");
            var sw = Stopwatch.StartNew();
            action.Execute();
            sw.Stop();
            Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Finished action {action.GetType().Name} in {sw.ElapsedMilliseconds}ms on thread {Thread.CurrentThread.ManagedThreadId}");
        });
        swSync.Stop();
        Console.WriteLine($"Done in {swSync.ElapsedMilliseconds}ms");
    }
}


public interface IAction
{
    int Size { get; }
    void Execute();
}

public class LongAction : IAction
{
    public int Size => 10000;
    public void Execute()
    {
        Thread.Sleep(10000);
    }
}

public class MediumAction : IAction
{

    public int Size => 1000;
    public void Execute()
    {
        Thread.Sleep(1000);
    }
}

public class ShortAction : IAction
{
    public int Size => 100;
    public void Execute()
    {
        Thread.Sleep(100);
    }
}

Et étant donné qu'il existe 3 variantes de cette action:

public class LongAction : IAction
{
    public int Size => 10000;
    public void Execute()
    {
        Thread.Sleep(10000);
    }
}

public class MediumAction : IAction
{

    public int Size => 1000;
    public void Execute()
    {
        Thread.Sleep(1000);
    }
}

public class ShortAction : IAction
{
    public int Size => 100;
    public void Execute()
    {
        Thread.Sleep(100);
    }
}

Comment pouvez-vous optimiser une longue liste de ces actions, de sorte que lorsqu'elles sont exécutées de manière parallèle, le tout le lot se termine aussi vite que possible?

Naïvement, vous pouvez simplement lancer le lot entier sur un Parallel.ForEach , avec un parallélisme raisonnablement élevé et cela fonctionne certainement - mais il doit être un moyen de les planifier de manière optimale afin que certains des plus gros démarrent en premier.

Pour illustrer davantage le problème, si nous prenons un exemple super simplifié

  • 1 tâche de taille 10
  • 5 tâches de taille 2
  • 10 tâches de taille 1

Et 2 fils disponibles. Je pourrais trouver 2 (parmi tant d'autres) façons de planifier ces tâches (la barre noire étant le temps mort - rien à planifier):

 entrez la description de l'image ici p >

Il est clair que le premier se termine plus tôt que le second.

Code minimal complet et vérifiable

Code de test complet si quelqu'un a envie d'un bash (essayez de le rendre plus rapide que mon naïf implémentation ci-dessous):

public interface IAction
{
    int Size { get; }
    void Execute();
}

7 commentaires

Ces problèmes de type sont appelés «problèmes d'emballage» où vous essayez de réduire le temps ou le nombre de ressources nécessaires pour résoudre un problème. Il n'y a pas d'algorithme pour résoudre le problème sans essayer toutes les possibilités, mais il existe de nombreux livres blancs écrits avec un algorithme pour trouver de bonnes solutions.


Merci @jdweng, je vais examiner cela. Je n'avais pas associé ce problème au problème d'emballage mais je suppose que vous avez raison.


En regardant les informations supplémentaires (que je pense que vous avez ajoutées depuis que j'ai commencé ma réponse), je suppose que si vous savez combien de threads sont disponibles, vous pouvez préallouer des tâches en fonction de la taille


@Jamiec Parallel.ForEach est destiné au parallélisme des données, pas à l'exécution asynchrone. Il est généralement un adapté à votre cas. Il fonctionne en partitionnant à peu près autant de partitions que de cœurs et utilise une tâche par partition pour traiter ses données sans pénalités de synchronisation. Il utilise même le thread courant pour le traitement, c'est pourquoi il semble se bloquer.


@Jamiec une bien meilleure option serait un ActionBlock de la bibliothèque TPL Dataflow, avec un DoP spécifique. Un bloc d'action est grosso modo un worker qui lit les données de sa file d'attente d'entrée et les traite. Par défaut, le DOP est 1, car les blocs de flux de données sont destinés à être utilisés dans un pipeline, chaque bloc s'exécutant sur sa propre tâche.


@jamiec ordonner simplement les actions par taille décroissante permettrait à toute implémentation de travail de se tenir occupée. Vous attendez-vous à l'arrivée de gros travaux inattendus pendant le traitement? Vous pouvez utiliser des blocs de traitement séparés pour ce cas


@jamiec check Partitionneurs personnalisés pour PLINQ et TPL . Le scénario que vous décrivez est similaire au partitionnement par blocs et à l'équilibrage de charge. Équilibrage de charge sur des tableaux et des listes ordonnés est déjà disponible . J'ai oublié ça!


4 Réponses :


1
votes

Pour commencer, je dirais que le problème est le suivant:

Vous avez une liste d'entiers et un nombre limité d'étés. Vous voulez un algorithme qui additionne les nombres entiers dans les étés afin que la valeur maximale des étés soit le minimum possible.

Par exemple:

while (taskManager.HasTasks())
{
    task = taskManager.GetLongestTask();
    thread = threadManager.GetFreeThread(); // blocks if no thread available
    thread.Run(task);
}

Comme vous le pouvez voir le facteur de délimitation est la tâche la plus longue. Les plus courts sont facilement servis en parallèle ou en moins de temps. C'est similaire à Knapsack, mais à la fin se résume à un très simple "premier" ordre des tâches.

Le pseudo code (avec mes classes inventées ) serait:

list = 1, 4, 10, 2, 3, 4
summers = 3

summer(1): 10
summer(2): 4 + 4
summer(3): 3 + 2 + 1

Ceci est juste du pseudo-code, pas parallèle / asynchrone et des blocs. J'espère que vous pourrez en tirer quelque chose d'utile.


0 commentaires

1
votes

Eh bien, cela dépend. Sur mon matériel, votre exemple artificiel (modifié pour que le sommeil soit 1000,100 et 10 ms car je n'ai pas toute la journée) est ~ 30% plus rapide (~ 15s vs ~ 22s) si je change simplement la boucle pour exécuter toutes les tâches longues d'abord:

Parallel.ForEach(list.OrderByDescending(l=>l.Size), action => ...

Mais bien sûr, cela dépend entièrement de la charge de ces tâches. Si deux tâches différentes utilisent fortement la même ressource (par exemple, une base de données partagée), il peut y avoir des gains très limités à exécuter ces deux tâches en parallèle car elles finiront par se verrouiller l'une l'autre pendant des périodes à un certain niveau.

Je suggérerais que vous ayez besoin d'une analyse plus approfondie, puis de regrouper les tâches en fonction de leur capacité à être parallèles en fonction de ce qu'elles font réellement, et d'essayer de vous assurer d'en exécuter autant des threads parallèles avec autant de tâches «compatibles» que possible ... Et bien sûr, si une tâche particulière semble toujours prendre autant de temps que toutes les autres assemblées, assurez-vous que l'on commence en premier ....

Très difficile de donner de meilleurs conseils avec les détails donnés ici.


4 commentaires

Il n'y a pas de base de données ou autre ressource partagée (sauf CPU bien sûr), toutes les tâches sont complètement indépendantes


Notez que Parallel.ForEach ne conserve pas l'ordre dans la liste d'entrée - le tri par taille de tâche ne tiendra pas.


@YoniV ce n'est pas forcément vrai, on peut spécifier un partitionneur personnalisé qui se chargera également du classement, voire implémentera un algorithme de packaging.


@YoniV - Je n'étais pas sûr que ce soit le cas, mais le changement que j'ai décrit ci-dessus a définitivement fait une différence. aurait bien sûr pu être de la chance, mais j'ai fait 3 courses avant qui étaient toutes autour de 22s, puis 3 courses après lesquelles étaient autour de 15s. J'accepte que ce comportement puisse changer avec les versions du compilateur, etc. mais je ne vois pas vraiment pourquoi Parallel ne retirerait pas simplement les éléments de la liste dans l'ordre dans lequel ils viennent et leur allouerait des threads, et si vous l'avez passé ce que je suppose est probablement un IOrderedEnumerable<> ou similaire, je ne vois pas pourquoi il n'obtiendrait pas les tâches dans cet ordre tant que tous les threads ne seraient pas alloués ...



1
votes

Le tri par taille de tâche dans l'ordre décroissant, puis l'utilisation de TaskFactory pour exécuter chacune dans une tâche différente a réduit considérablement le temps d'exécution. Le niveau de parallélisme est resté 20. Les résultats ont été: 114 676 ms contre 193 713 ms dans votre échantillon d'origine. (~ 40% d'amélioration)

EDIT: Dans votre exemple spécifique, la liste est de toute façon triée dès le départ, mais Parallel.ForEach ne conserve pas l'ordre d'entrée.

static async Task MainAsync()
{
    var list = new List<IAction>();
    for (var i = 0; i < 200; i++) list.Add(new LongAction());
    for (var i = 0; i < 200; i++) list.Add(new MediumAction());
    for (var i = 0; i < 200; i++) list.Add(new ShortAction());

    Console.WriteLine("Sorting...");
    list.Sort((x, y) => y.Size.CompareTo(x.Size));
    int totalTasks = 0;

    int degreeOfParallelism = 20;
    var swSync = Stopwatch.StartNew();
    using (SemaphoreSlim semaphore = new SemaphoreSlim(degreeOfParallelism))
    {
        foreach (IAction action in list)
        {
            semaphore.Wait();
            Task.Factory.StartNew(() =>
            {
                try
                {
                    Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Starting action {action.GetType().Name} on thread {Thread.CurrentThread.ManagedThreadId}");
                    var sw = Stopwatch.StartNew();
                    action.Execute();
                    sw.Stop();
                    Console.WriteLine($"{DateTime.Now:HH:mm:ss}: Finished action {action.GetType().Name} in {sw.ElapsedMilliseconds}ms on thread {Thread.CurrentThread.ManagedThreadId}");
                }
                finally
                {
                    totalTasks++;
                    semaphore.Release();
                }
            });
        }

        // Wait for remaining tasks....
        while (semaphore.CurrentCount < 20)
        { }

        swSync.Stop();
        Console.WriteLine($"Done in {swSync.ElapsedMilliseconds}ms");
        Console.WriteLine("Performed total tasks: " + totalTasks);
    }
}


4 commentaires

Il n'y a aucune raison d'utiliser des tâches brutes et des sémaphores pour spécifier un DOP. Les classes de niveau supérieur dans le TPL (PLINQ, Dataflow, Parallel) fonctionnent à l'aide de tâches, pas de threads.


@PanagiotisKanavos pouvez-vous élaborer - Parallel.ForEach ne semble pas honorer la commande.


@Jamiec comme je l'ai dit, Parallel est inapproprié en premier lieu. Ce n'est pas un problème de parallélisme des données. Parallel partitionnera les données avant le traitement, ce qui signifie que des actions coûteuses peuvent se retrouver dans la même partition.


@Jamiec Vous pouvez éviter que par spécifiant votre propre partitionneur mais j'utiliserais d'abord un ActionBlock avec un grand DOP pour garder les choses simples - N travailleurs tirant de la même file d'attente d'entrée, et alimenter en premier les actions les plus coûteuses



1
votes

Une solution relativement rapide et sale consiste à utiliser un partitionneur d'équilibrage de charge au-dessus d'une liste d'actions triée par taille décroissante

var sorted = list.OrderByDescending(a => a.Size);
var partitioner=Partitioner.Create(sorted,EnumerablePartitionerOptions.NoBuffering);

En utilisant seulement ces deux lignes, les performances s'améliorent d'environ 30%, tout comme les autres réponses.

PLINQ partitionne les données et utilise une tâche distincte pour traiter une partition entière à la fois . Lorsque la taille d'entrée est connue, comme c'est le cas avec les tableaux et listes dérivés d'IList, l'entrée est partitionnée en morceaux de taille égale et envoyée à chaque tâche de travail.

Lorsque la taille n'est pas connue, comme C'est le cas avec les méthodes d'itération, les requêtes LINQ, etc. PLINQ utilise le partitionnement par blocs. Un morceau de données est récupéré à la fois et transmis aux tâches de travail.

Une autre option, que j'avais oubliée, est l ' équilibrage de charge sur le partitionnement de chunck supérieur. Cela applique le partitionnement de blocs à l'aide de petits morceaux aux tableaux et à l'entrée dérivée d'IList. L'équilibrage de charge Partitioner.Create renvoient des instances de OrderablePartitioner, de sorte que l'ordre des éléments IAction est préservé

La même chose peut être obtenue avec un IEnumerable source en spécifiant l'option EnumerablePartitionerOptions.NoBuffering :

var sorted = list.OrderByDescending(a => a.Size).ToArray();
var partitioner=Partitioner.Create(sorted, loadBalance:true);

Parallel.ForEach(partitioner, options, action =>...);

Cela créera un OrderablePartitioner qui utilise le codage par blocs


1 commentaires

C'était définitivement le gagnant. Avec mon test sur une liste non ordonnée, le temps était d'environ 14 secondes. l'utilisation d'un partitionneur avec loadBalance: true réduit cela à 9s. Merci beaucoup pour votre aide.