private void button3_Click(object sender, EventArgs e) { Thread tCreater = new Thread(new ThreadStart(Creater)); tCreater.Start(); Thread tC1 = new Thread(new ThreadStart(ConsumeBill)); tC1.Name = "tc1"; tC1.Start(); Thread tC2 = new Thread(new ThreadStart(ConsumeBill)); tC2.Name = "tc2"; tC2.Start(); Thread tC3 = new Thread(new ThreadStart(ConsumeBill)); tC3.Name = "tc3"; tC3.Start(); } DataTable dt = null; public static object syncObj = new object(); public void Creater() { while (true) { if (dt == null || dt.Rows.Count == 0) { Console.WriteLine("生产者线程进入"); Monitor.Enter(syncObj); Console.WriteLine("生产者开始生产产品"); RecCityComplexPayBillApplyBll bll = new RecCityComplexPayBillApplyBll(); dt = bll.TableBySearch(); Console.WriteLine("生产者线程通知消费者继续"); Monitor.Pulse(syncObj); Console.WriteLine("生产者线程进等待"); Monitor.Wait(syncObj); Monitor.Exit(syncObj); } } } public static object syncConsume = new object(); public void ConsumeBill() { while (true) { if (dt != null && dt.Rows.Count > 0) { Monitor.Enter(syncConsume); Console.WriteLine("消费者者线程进入:" + Thread.CurrentThread.Name); if (dt != null && dt.Rows.Count > 0) { Console.WriteLine("消费者取出产品:" + Thread.CurrentThread.Name); DataRow dr = dt.Rows[0]; RecCityComplexPayBillApplyInfo info = new RecCityComplexPayBillApplyInfo(); info.BaseImportByRow(dr); dt.Rows.Remove(dr); Console.WriteLine("消费者线程退出" + Thread.CurrentThread.Name); if(dt.Rows.Count>0) { Monitor.Exit(syncConsume); //自行实现process处理,先释放资源,然后再处理,一边其他线程能继续处理其他记录 process(info); } else { //自行实现process处理, process(info); Monitor.Enter(syncObj); //通知生产者线程 Console.WriteLine("消费者者线程通知生产者产品:" + Thread.CurrentThread.Name); Monitor.Pulse(syncObj); //消费者独占消费资源并等待处理线程处理记录,以便所有线程都能处理完成(偶尔会出现处理未结束,生产者已经开始,但能最大限度的保证重复处理) //所有消费者线程等待 Console.WriteLine("消费者者线程等待生产者产生产品wait:" + Thread.CurrentThread.Name); Monitor.Wait(syncObj); Console.WriteLine("消费者者线程开始执行:" + Thread.CurrentThread.Name); Monitor.Exit(syncObj); Monitor.Exit(syncConsume); } } else { //系统不会进到该出 Monitor.Exit(syncConsume); } } }