Inside this array, the CWorker class creates an implementation
of the CWorkerThread class. CWorkerThread
A class (discussed below) is an abstract class that must be inherited. The exported
class defines how messages are processed:
aThreads = new ArrayList();
for
(int idx=0; idx〈sfWorker.NumberThreads; idx++)
{
WorkerThreadFormatter
wfThread = new WorkerThreadFormatter();
wfThread.PRocessName = sfWorker.ProcessName;
wfThread.ProcessDesc = sfWorker.ProcessDesc;
wfThread.ThreadNumber = idx;
wfThread.InputQueue = sfWorker.InputQueue;
wfThread.ErrorQueue =
sfWorker.ErrorQueue;
wfThread.OutputName = sfWorker.OutputName;
//
Define the auxiliary type and insert it into the auxiliary thread structure
CWorkerThread wtBase;
switch
(sfWorker.ProcessType)
{
case
WorkerFormatter.SFProcessType.ProcessRoundRobin:
wtBase = new
CWorkerThreadRoundRobin(this, wfThread);
break;
case
WorkerFormatter.SFProcessType.ProcessAppSpecific:
wtBase = new
CWorkerThreadAppSpecific(this, wfThread);
break;
case
WorkerFormatter.SFProcessType.ProcessAssembly:
wtBase = new
CWorkerThreadAssembly(this, wfThread);
break;
default:
throw new Exception("Unknown Processing Type");
}
// Add a call to the array
aThreads.Insert(idx, wtBase);
}
Once all objects have been created, you can start them by calling the Start method of each thread object:
foreach(CWorkerThread
cThread in aThreads)
cThread.Start();
Stop method has the following garbage collection operations:
GC.SuppressFinalize(this);
. If Stop is called
method, there will be no need to parse the
constructor. The SuppressFinalize method prevents calling the object's Finalize method (the actual implementation of the constructor).
CWorkerThread abstract class
CWorkerThread is a class composed of CWorkerThreadAppSpecifc, CWorkerThread
RoundRobin and Abstract class inherited by CWorkerThreadAssembly. No matter how you handle the message, most of the processing of the queue is the same, so the CWorkerThread class provides this functionality.This class provides abstract methods (which must be replaced by real methods) to manage resources and handle messages.
The work of the class is once again accomplished through the Start, Stop, Pause and Continue methods.
The input and error queues are referenced in the Start method. exist
In the .NET framework, messages are handled by the System.
Messaging namespace:
// Try to open the queue and set the default read and write properties
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue
mqError = new MessageQueue(sErrorQueue);
// If using MSMQ COM, set the formatter to ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter =
new ActiveXMessageFormatter();
Once the message queue reference is defined, a thread is created for the actual processing function
(called ProcessMessages). In the .NET Framework, use
System.Threading
namespace makes it easy to implement threading:
procMessage = new Thread(new
ThreadStart(ProcessMessages));
The ProcessMessages function is a processing loop based on Boolean values. When the value is set to
False, the processing loop will terminate. Therefore, the thread object's Stop
The method only sets this Boolean
value, then closes the open message queue, and joins the thread with the main thread:
// Join the service thread and processing thread
procMessage.Join();
// Close the open message queue
mqInput.Close();
mqError.Close();
Pause method only sets a Boolean value to make the processing thread sleep for half a second:
if (bPause)
Thread.Sleep(500);
Finally, each Start, Stop, Pause and Continue methods will call abstract
OnStart, OnStop, OnPause and OnContinue method. These abstract methods provide hooks for classes implementing to capture and release required resources.
●Receive Message.
● If the Message has a successful Receive, the abstract ProcessMessage method is called.
●If Receive or ProcessMessage fails, send the Message to the error queue.
Message mInput;
try
{
// Read from the queue and wait for 1 second
mInput =
mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException
mqe)
{
// Set the message to null
mInput = null;
// Check the error code to see if it timed out
if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
{
//
If it does not time out, issue an error and log the error number
LogError("Error: " + mqe.Message);
throw mqe;
}
}
if (mInput != null)
{
// Get a message to be processed and call the message processing abstract method
try
{
ProcessMessage(mInput);
}
// Capture errors with known exception status
catch (CWorkerThreadException ex)
{
ProcessError(mInput,
ex.Terminate);
}
// Catch unknown exceptions and call Terminate
catch
{
ProcessError(mInput, true);
}
}
The ProcessError method sends the error message to the error queue. In addition, it may also lead to
Send an exception to terminate the thread. It will do this if the ProcessMessage method throws a termination error or CWorker
ThreadException type.
CworkerThread exported class
Any class that inherits from CWorkerThread must provide OnStart, OnStop, On
Pause, OnContinue and
ProcessMessage method. The OnStart and OnStop methods acquire and release processing resources. OnPause and OnContinue
Methods allow temporary release and reacquisition of these resources. The ProcessMessage method should process the message and raise
when a failure event occurs
CWorkerThreadException exception.
Since the CWorkerThread constructor defines runtime parameters, the derived class must call the base class
public
CWorkerThreadDerived(CWorker v_cParent, WorkerThread
Formatter v_wfThread)
: base (v_cParent, v_wfThread) {}
The exported class provides two types of processing: sending the message to another queue, or calling the component method. Both implementations of receiving and sending messages use looping techniques or application offsets (keeping
should include a list of queue paths. Implemented OnStart and
The OnStop method
should open and close references to these queues:
iQueues = wfThread.OutputName.Length;
mqOutput = new MessageQueue[iQueues];
for (int idx=0; idx〈iQueues;
idx++)
{
mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);
mqOutput[idx].Formatter = new ActiveXMessageFormatter();
}
In these scenarios, message processing is simple: send the message to the necessary output queue. In the case of
loop, this process is:
{
mqOutput[iNextQueue].Send(v_mInput);
}
catch (Exception ex)
{
// Forced termination exception if error
throw new CWorkerThreadException(ex.Message, true);
}
// Calculate the next queue number
iNextQueue++;
iNextQueue %= iQueues;
The latter method of calling a component with message parameters is more interesting. ProcessMessage
The
method uses the IWebMessage interface to call a .NET component. The OnStart and OnStop methods obtain and release the reference to this component.
The configuration file in this solution should contain two items: the complete class name and the
location of the file where the class is located. According to the definition in the IWebMessage interface, call it on the component
Process method.
The number requires an assembly type. Here it is exported from the assembly file path and class name.
Once the object reference is obtained, it will be put into the appropriate interface:
private string sFilePath, sTypeName;
//
Save assembly path and type name
sFilePath = wfThread.OutputName[0];
sTypeName =
wfThread.OutputName[1];
// Get a reference to the necessary object
Assembly asmSample =
Assembly.LoadFrom(sFilePath);
Type typSample = asmSample.GetType(sTypeName);
object objSample = Activator.CreateInstance(typSample);
// Define the necessary interface for the object
iwmSample = (IWebMessage)objSample;
After obtaining the object reference, the ProcessMessage method will call the
Process method on the IWebMessage interface:
WebMessageReturn wbrSample;
try
{
// Define the parameters of method call
string
sLabel = v_mInput.Label;
string sBody = (string)v_mInput.Body;
int
iAppSpecific = v_mInput.AppSpecific;
// Call the method and capture the return code
wbrSample =
iwmSample.Process(sLabel, sBody, iAppSpecific);
}
catch
(InvalidCastException ex)
{
// If an error occurs in the message content, force a non-terminating exception to be issued
throw
new CWorkerThreadException(ex.Message, false);
}
catch (Exception ex)
{
// If the assembly is called incorrectly, force a termination exception
throw new
CWorkerThreadException(ex.Message, true);
}
// If there is no error, check the return status of the object call
switch (wbrSample)
{
case WebMessageReturn.ReturnBad:
throw
new CWorkerThreadException
("Unable to process message: Message marked
bad", false);
case WebMessageReturn.ReturnAbort:
throw new
CWorkerThreadException
("Unable to process message: Process
terminating", true);
default:
break;
}
The provided sample component writes the message body to a database table. If a serious database error is captured
Error, you may want to terminate the process, but here, just mark the message as an error message
.
Since the class instance created in this example may acquire and retain expensive database resources, use the OnPause and OnContinue methods to release and reacquire object references.
status of the application. . NET Framework greatly simplifies integrating event logs, performance counters, and Windows management instrumentation (WMI
) into the application process. The messaging application uses time logs and performance counters, both from the System.Diagnostics assembly.
In the ServiceBase class, you can automatically enable event logging. Additionally, the ServiceBase
EventLog member supports writing to application event logs:
For applications that write to the event log rather than the application log, it is easy to
create and get a reference to the EventLog resource (as described in
the same as done in the CWorker class),
private EventLog cLog;
string sSource = ServiceControl.ServiceControlName;
string sLog = "application";
//
Check if the source exists, if not, create the source
if (!EventLog.SourceExists(sSource))
EventLog.CreateEventSource(sSource, sLog);
// Create a log object and reference the currently defined source
cLog =
new EventLog();
cLog.Source = sSource;
// Write an entry in the log to indicate successful creation
cLog.WriteEntry("Created successfully", EventLogEntryType.Information);
The .NET framework greatly simplifies performance counters. The messaging application provides counters for each processing thread, user of thread export
and the entire application to track messages.
and then increment the corresponding counter instance.
The category of performance counters is defined in the service OnStart method. These categories represent two counters - the total number of messages and the number of messages processed per second:
CounterCreationData[] cdMessage = new CounterCreationData[2];
Messages
Processed",
PerformanceCounterType.NumberOfItems64);
cdMessage[1] = new
CounterCreationData("Messages/Second",
"Messages Processed a Second",
PerformanceCounterType.RateOfChangePerSecond32);
PerformanceCounterCategory.Create("MSDN Message Service", "MSDN
Message
Service Counters", cdMessage);
Once the performance counter category is defined, a PerformanceCounter object is created to access
Ask about counter instance function. A PerformanceCounter object requires a category, counter name, and an optional instance name. For the helper process, the process name from the xml file will be used, the code is as follows:
pcMsgTotWorker = new PerformanceCounter("MSDN Message Service",
pcMsgSecWorker = new
PerformanceCounter("MSDN Message Service",
"Messages/Second", sProcessName);
pcMsgTotWorker.RawValue = 0;
pcMsgSecWorker.RawValue = 0;
To increment the counter value, just call the appropriate method:
pcMsgTotWorker.IncrementBy(1);
pcMsgSecWorker.IncrementBy(1);
Finally, when the service is terminated, the installed performance counter category should be deleted from the system:
PerformanceCounterCategory.Delete("MSDN Message Service");
Because performance counters work in the .NET Framework, a special service needs to be running.
This service (PerfCounterService) provides shared memory. Counter information is written to shared
memory and read by the performance counter system.
Installation
Before we end, let’s briefly introduce installation and the
installation tool called installutil.exe. Since this application is
Windows service, it must be installed using installutil.exe
. Therefore, one needs to use one from System.Configuration.Install
Installer class inherited in assembly
:
public class ServiceRegister: Installer
{
private ServiceInstaller serviceInstaller;
private ServiceProcessInstaller
processInstaller;
public ServiceRegister()
{
// Create a service installer
serviceInstaller = new ServiceInstaller();
serviceInstaller.StartType = ServiceStart.Manual;
serviceInstaller.ServiceName = ServiceControl.ServiceControl
Name;
serviceInstaller.DisplayName = ServiceControl.ServiceControl
Desc;
Installers.Add(serviceInstaller);
// Create process installer
processInstaller = new ServiceProcessInstaller();
processInstaller.RunUnderSystemAccount = true;
Installers.Add(processInstaller);
}
}
As shown in this sample class, for a Windows service, the service and the service process each require an installer to define the account under which the service runs. Other installers allow registering event logs and
Performance counters and other resources.
programmers can now be implemented using simple object-oriented programs. exhaust
Although our focus is on C#, the content described in this article also applies to Visual Basic and
Managed C++. The new .NET
The framework enables developers to create powerful, scalable Windows applications and services using any programming language.
The new .NET Framework not only simplifies and expands programming possibilities, but also makes it easy to integrate
Incorporated into the app. Although the application here does not use Windows Management Detection Devices
(WMI), the .NET Framework can apply it as well.
The above is the content of C# Message Queue Application-2. For more related articles, please pay attention to the PHP Chinese website (www.php.cn)!