#include <ddk/ntddk.h>
#include <internal/ke.h>
#include <internal/id.h>
+#include <internal/ps.h>
#define NDEBUG
#include <internal/debug.h>
sizeof(KQUEUE)/sizeof(ULONG),
0);
InitializeListHead(&Queue->EntryListHead);
- InitializeListHead(&Queue->ThreadListEntry);
- Queue->CurrentCount = 0;
- Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
+ InitializeListHead(&Queue->ThreadListHead);
+ Queue->RunningThreads = 0;
+ Queue->MaximumThreads = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
}
#ifndef LIBCAPTIVE
LONG STDCALL
+KiInsertQueue(
+ IN PKQUEUE Queue,
+ IN PLIST_ENTRY Entry,
+ BOOLEAN Head
+ )
+{
+ ULONG InitialState;
+
+ DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
+
+ KeAcquireDispatcherDatabaseLock(FALSE);
+
+ InitialState = Queue->Header.SignalState;
+ Queue->Header.SignalState++;
+
+ if (Head)
+ {
+ InsertHeadList(&Queue->EntryListHead, Entry);
+ }
+ else
+ {
+ InsertTailList(&Queue->EntryListHead, Entry);
+ }
+
+ if (Queue->RunningThreads < Queue->MaximumThreads && InitialState == 0)
+ {
+ KeDispatcherObjectWake(&Queue->Header);
+ }
+
+ KeReleaseDispatcherDatabaseLock(FALSE);
+ return InitialState;
+}
+
+
+
+LONG STDCALL
KeInsertHeadQueue(IN PKQUEUE Queue,
IN PLIST_ENTRY Entry)
{
- UNIMPLEMENTED;
- return 0;
+ return KiInsertQueue(Queue,Entry,TRUE);
}
KeInsertQueue(IN PKQUEUE Queue,
IN PLIST_ENTRY Entry)
{
- UNIMPLEMENTED;
- return 0;
+ return KiInsertQueue(Queue,Entry,FALSE);
}
IN KPROCESSOR_MODE WaitMode,
IN PLARGE_INTEGER Timeout OPTIONAL)
{
- UNIMPLEMENTED;
- return NULL;
+ PLIST_ENTRY ListEntry;
+ NTSTATUS Status;
+ PKTHREAD Thread = KeGetCurrentThread();
+
+ KeAcquireDispatcherDatabaseLock(FALSE);
+
+ //assiciate new thread with queue?
+ if (Thread->Queue != Queue)
+ {
+ //remove association from other queue
+ if (!IsListEmpty(&Thread->QueueListEntry))
+ {
+ RemoveEntryList(&Thread->QueueListEntry);
+ }
+
+ //associate with this queue
+ InsertHeadList(&Queue->ThreadListHead, &Thread->QueueListEntry);
+ Queue->RunningThreads++;
+ Thread->Queue = Queue;
+ }
+
+ if (Queue->RunningThreads <= Queue->MaximumThreads && !IsListEmpty(&Queue->EntryListHead))
+ {
+ ListEntry = RemoveHeadList(&Queue->EntryListHead);
+ Queue->Header.SignalState--;
+ KeReleaseDispatcherDatabaseLock(FALSE);
+ return ListEntry;
+ }
+
+ //need to wait for it...
+ KeReleaseDispatcherDatabaseLock(FALSE);
+
+ Status = KeWaitForSingleObject(Queue,
+ WrQueue,
+ WaitMode,
+ TRUE,//Alertable,
+ Timeout);
+
+ if (Status == STATUS_TIMEOUT || Status == STATUS_USER_APC)
+ {
+ return (PVOID)Status;
+ }
+ else
+ {
+ KeAcquireDispatcherDatabaseLock(FALSE);
+ ListEntry = RemoveHeadList(&Queue->EntryListHead);
+ KeReleaseDispatcherDatabaseLock(FALSE);
+ return ListEntry;
+ }
+
}
PLIST_ENTRY STDCALL
KeRundownQueue(IN PKQUEUE Queue)
{
- UNIMPLEMENTED;
- return NULL;
+ PLIST_ENTRY EnumEntry;
+ PKTHREAD Thread;
+
+ DPRINT("KeRundownQueue(Queue %x)\n", Queue);
+
+ //FIXME: should we wake thread waiting on a queue?
+
+ KeAcquireDispatcherDatabaseLock(FALSE);
+
+ // Clear Queue and QueueListEntry members of all threads associated with this queue
+ while (!IsListEmpty(&Queue->ThreadListHead))
+ {
+ EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
+ InitializeListHead(EnumEntry);
+ Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
+ Thread->Queue = NULL;
+ }
+
+ if (!IsListEmpty(&Queue->EntryListHead))
+ EnumEntry = Queue->EntryListHead.Flink;
+ else
+ EnumEntry = NULL;
+
+ KeReleaseDispatcherDatabaseLock(FALSE);
+
+ return EnumEntry;
}
#endif /* LIBCAPTIVE */