In an application I’m working on, I had a need for a method to write a single object to several Lucene indexes in order to support different types of searches. I ended up with a method that looks like this:

C#
1
2
3
4
5
6
7
8
9
public static void Accept<T>(T to_be_indexed) 
{
    var indexers = StructureMap.ObjectFactory.GetAllInstances<Indexer<T>>();
    
    foreach (var x in indexers)
    {
        x.Index(to_be_indexed);
    }
}
public static void Accept<T>(T to_be_indexed) 
{
	var indexers = StructureMap.ObjectFactory.GetAllInstances<Indexer<T>>();
	
	foreach (var x in indexers)
	{
		x.Index(to_be_indexed);
	}
}

Simple enough. Because of all the disk I/O involved, it seemed like this would be a good candidate for processing concurrently. I tried a few different options, but found manually creating an array of threads too awkward, and had some trouble getting the ThreadPool to wait properly. I was talking about this with Matt and he showed me a small class that he used for just this type of scenario called Countdown.

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private volatile int current;
 
    public Countdown(int total)
    {
        current = total;
        done = new ManualResetEvent(false);
    }
 
    public void Signal()
    {
        lock (done)
        {
            if (current > 0 && --current == 0)
                done.Set();
        }
    }
 
    public void Wait()
    {
        done.WaitOne();
    }
 
    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
} 
public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private volatile int current;

    public Countdown(int total)
    {
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        lock (done)
        {
            if (current > 0 && --current == 0)
                done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
} 

I found this class much more predictable and easier to work with than a bunch of ManualResetEvents, so we started looking at how it could be used to solve the problem at hand. It was remarkably easy, and I can only recall one issue that we ran into. The end result looked like this:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void Accept<T>(T to_be_indexed) 
{
    var indexers = StructureMap.ObjectFactory.GetAllInstances<Indexer<T>>();
    
    using (var cd = new Countdown( indexers.Count ))
    {
        foreach (var current in indexers)
        {
            var captured = current;
            ThreadPool.QueueUserWorkItem(x =>
                {
                    captured.Index(to_be_indexed);
                    cd.Signal();
                });
        }
        cd.Wait();
    }
}
public static void Accept<T>(T to_be_indexed) 
{
	var indexers = StructureMap.ObjectFactory.GetAllInstances<Indexer<T>>();
	
	using (var cd = new Countdown( indexers.Count ))
    {
        foreach (var current in indexers)
        {
            var captured = current;
            ThreadPool.QueueUserWorkItem(x =>
                {
                    captured.Index(to_be_indexed);
                    cd.Signal();
                });
        }
        cd.Wait();
    }
}

The problem we ran into was the lambda expression grabbing whatever “current” was defined as at execution time, causing the same index to be written to twice (and others not at all) in many cases. This is what forced us to add the line
var captured = current;
and use captured rather than current within the lambda. I’ve gotta say, I really like the all the code in that using block for the countdown. It may be because I set rather low expectations with my hackish early attempts to get this done, but I like things very simple and this definitely fits the bill. But, as beautiful / simple as the code may be, I still don’t want to write it multiple times if I can avoid it. So I took a look at the signature for Parallel.ForEach in .net 4 and threw together a version that would work with my system (running on mono 2.4.2.3, mostly equivalent to .net 3.5). Here that is:

C#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void ForEach<T>(IEnumerable<T> enumerable, Action<T> action)
{
    using (var cd = new Countdown( enumerable.Count() ))
    {
        foreach (var current in enumerable)
        {
            var captured = current;
            ThreadPool.QueueUserWorkItem(x =>
                {
                    action.Invoke(captured);
                    cd.Signal();
                });
        }
        cd.Wait();
    }
}
public static void ForEach<T>(IEnumerable<T> enumerable, Action<T> action)
{
    using (var cd = new Countdown( enumerable.Count() ))
    {
        foreach (var current in enumerable)
        {
            var captured = current;
            ThreadPool.QueueUserWorkItem(x =>
                {
                    action.Invoke(captured);
                    cd.Signal();
                });
        }
        cd.Wait();
    }
}

The only real changes were to use the Count() method on IEnumerable rather than the Count property on IList, and the change from
captured.Index(to_be_indexed);
to
action.Invoke(captured);
to allow execution of whatever method was passed in.

Then, the Accept method can be changed to look like this:

C#
1
2
3
4
5
6
7
8
public static void Accept<T>(T to_be_indexed) 
{
    var indexers = StructureMap.ObjectFactory.GetAllInstances<Indexer<T>>();
    
    Parallel.ForEach(indexers, x => {
        x.Index(to_be_indexed); 
    });
}
public static void Accept<T>(T to_be_indexed) 
{
	var indexers = StructureMap.ObjectFactory.GetAllInstances<Indexer<T>>();
	
	Parallel.ForEach(indexers, x => {
		x.Index(to_be_indexed);	
	});
}

I realize that this is a very naive implementation, and that Parallel.ForEach in .net 4 is probably doing things under the hood that I haven’t even considered (optimizing for different number of CPU’s and such) but I think / hope that the ThreadPool takes care of some of that for me. I’d love to hear any comments on how to make it better though.

For another implementation, check out this Code Project article: Poor Man’s Parallel ForEach. We tested this implementation briefly (and far from scientifically), but found that the ThreadPool behaved a bit more consistently, was often a bit faster, and also required much less code to implement.