Fennel: /home/pub/open/dev/fennel/device/IoCompletionPortScheduler.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 00026 #ifdef MSVC 00027 00028 #include "fennel/device/RandomAccessDevice.h" 00029 #include "fennel/device/IoCompletionPortScheduler.h" 00030 #include "fennel/device/DeviceAccessSchedulerParams.h" 00031 #include "fennel/common/SysCallExcn.h" 00032 #include "fennel/synch/Thread.h" 00033 00034 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $"); 00035 00036 class IoCompletionPortThread : public Thread 00037 { 00038 IoCompletionPortScheduler &scheduler; 00039 public: 00040 IoCompletionPortThread(IoCompletionPortScheduler &schedulerInit) 00041 : scheduler(schedulerInit) 00042 { 00043 } 00044 virtual void run(); 00045 }; 00046 00047 IoCompletionPortScheduler::IoCompletionPortScheduler( 00048 DeviceAccessSchedulerParams const &params) 00049 { 00050 quit = false; 00051 00052 hCompletionPort = CreateIoCompletionPort( 00053 INVALID_HANDLE_VALUE, 00054 NULL, 00055 0, 00056 params.nThreads); 00057 if (hCompletionPort) { 00058 throw SysCallExcn("CreateIoCompletionPort failed for scheduler"); 00059 } 00060 00061 for (uint i = 0; i < params.nThreads; ++i) { 00062 IoCompletionPortThread *pThread = new IoCompletionPortThread(*this); 00063 pThread->start(); 00064 threads.push_back(pThread); 00065 } 00066 } 00067 00068 IoCompletionPortScheduler::~IoCompletionPortScheduler() 00069 { 00070 assert(isStarted()); 00071 if (!CloseHandle(hCompletionPort)) { 00072 throw SysCallExcn("CloseHandle failed for IoCompletionPort"); 00073 } 00074 } 00075 00076 bool IoCompletionPortScheduler::schedule(RandomAccessRequest &request) 00077 { 00078 assert(isStarted()); 00079 00080
00081 00082 FileSize cbOffset = request.cbOffset; 00083 RandomAccessRequest::BindingListMutator bindingMutator(request.bindingList); 00084 while (bindingMutator) { 00085 RandomAccessRequestBinding *pBinding = bindingMutator.detach(); 00086 LARGE_INTEGER largeInt; 00087 largeInt.QuadPart = cbOffset; 00088 pBinding->Offset = largeInt.LowPart; 00089 pBinding->OffsetHigh = largeInt.HighPart; 00090 BOOL bCompleted; 00091 if (request.type == RandomAccessRequest::READ) { 00092 bCompleted = ReadFile( 00093 HANDLE(request.pDevice->getHandle()), 00094 pBinding->getBuffer(), 00095 pBinding->getBufferSize(), 00096 NULL, 00097 pBinding); 00098 } else { 00099 bCompleted = WriteFile( 00100 HANDLE(request.pDevice->getHandle()), 00101 pBinding->getBuffer(), 00102 pBinding->getBufferSize(), 00103 NULL, 00104 pBinding); 00105 } 00106 if (!bCompleted) { 00107 if (GetLastError() != ERROR_IO_PENDING) { 00108 pBinding->notifyTransferCompletion(false); 00109 } 00110 } 00111 cbOffset += pBinding->getBufferSize(); 00112 } 00113 assert(cbOffset == request.cbOffset + request.cbTransfer); 00114 00115 return true; 00116 } 00117 00118 void IoCompletionPortScheduler::stop() 00119 { 00120 assert(isStarted()); 00121 00122 quit = true; 00123 00124
00125
00126 for (uint i = 0; i < threads.size(); ++i) { 00127 if (!PostQueuedCompletionStatus(hCompletionPort,0,0,NULL)) { 00128 throw SysCallExcn("PostQueuedCompletionStatus failed"); 00129 } 00130 } 00131 00132 for (uint i = 0; i < threads.size(); ++i) { 00133 threads[i]->join(); 00134 deleteAndNullify(threads[i]); 00135 } 00136 threads.clear(); 00137 } 00138 00139 void IoCompletionPortThread::run() 00140 { 00141 DWORD cbTransfer; 00142 ULONG_PTR pUnused; 00143 OVERLAPPED *pOverlapped; 00144 for (;;) { 00145 BOOL rc = GetQueuedCompletionStatus( 00146 scheduler.hCompletionPort, 00147 &cbTransfer, 00148 &pUnused, 00149 &pOverlapped, 00150 INFINITE); 00151 if (scheduler.quit) { 00152 return; 00153 } 00154 RandomAccessRequestBinding *pBinding = 00155 static_cast<RandomAccessRequestBinding *>(pOverlapped); 00156 if (rc) { 00157 assert(cbTransfer == pBinding->getBufferSize()); 00158 } 00159 pBinding->notifyTransferCompletion(rc); 00160 } 00161 } 00162 00163 void IoCompletionPortScheduler::registerDevice( 00164 SharedRandomAccessDevice pDevice) 00165 { 00166 int hFile = pDevice->getHandle(); 00167 if (hFile == -1) { 00168 return; 00169 } 00170 if (!CreateIoCompletionPort( 00171 HANDLE(hFile), 00172 hCompletionPort, 00173 0, 00174 threads.size())) 00175 { 00176 throw SysCallExcn("CreateIoCompletionPort failed for device"); 00177 } 00178 00179
00180 } 00181 00182 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/device/IoCompletionPortScheduler.cpp#10 $"); 00183 00184 #endif 00185 00186