00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "FileReadThread.h"
00017
00018 #include "base/Profiler.h"
00019
00020 #include <iostream>
00021 #include <unistd.h>
00022
00023
00024
00025 FileReadThread::FileReadThread() :
00026 m_nextToken(0),
00027 m_exiting(false)
00028 {
00029 }
00030
00031 void
00032 FileReadThread::run()
00033 {
00034 m_mutex.lock();
00035
00036 while (!m_exiting) {
00037 if (m_queue.empty()) {
00038 m_condition.wait(&m_mutex, 1000);
00039 } else {
00040 process();
00041 }
00042 notifyCancelled();
00043 }
00044
00045 notifyCancelled();
00046 m_mutex.unlock();
00047
00048 #ifdef DEBUG_FILE_READ_THREAD
00049 std::cerr << "FileReadThread::run() exiting" << std::endl;
00050 #endif
00051 }
00052
00053 void
00054 FileReadThread::finish()
00055 {
00056 #ifdef DEBUG_FILE_READ_THREAD
00057 std::cerr << "FileReadThread::finish()" << std::endl;
00058 #endif
00059
00060 m_mutex.lock();
00061 while (!m_queue.empty()) {
00062 m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
00063 m_newlyCancelled.insert(m_queue.begin()->first);
00064 m_queue.erase(m_queue.begin());
00065 }
00066
00067 m_exiting = true;
00068 m_mutex.unlock();
00069
00070 m_condition.wakeAll();
00071
00072 #ifdef DEBUG_FILE_READ_THREAD
00073 std::cerr << "FileReadThread::finish() exiting" << std::endl;
00074 #endif
00075 }
00076
00077 int
00078 FileReadThread::request(const Request &request)
00079 {
00080 m_mutex.lock();
00081
00082 int token = m_nextToken++;
00083 m_queue[token] = request;
00084
00085 m_mutex.unlock();
00086 m_condition.wakeAll();
00087
00088 return token;
00089 }
00090
00091 void
00092 FileReadThread::cancel(int token)
00093 {
00094 m_mutex.lock();
00095
00096 if (m_queue.find(token) != m_queue.end()) {
00097 m_cancelledRequests[token] = m_queue[token];
00098 m_queue.erase(token);
00099 m_newlyCancelled.insert(token);
00100 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00101 m_cancelledRequests[token] = m_readyRequests[token];
00102 m_readyRequests.erase(token);
00103 } else {
00104 std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
00105 }
00106
00107 m_mutex.unlock();
00108
00109 #ifdef DEBUG_FILE_READ_THREAD
00110 std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
00111 #endif
00112
00113 m_condition.wakeAll();
00114 }
00115
00116 bool
00117 FileReadThread::isReady(int token)
00118 {
00119 m_mutex.lock();
00120
00121 bool ready = m_readyRequests.find(token) != m_readyRequests.end();
00122
00123 m_mutex.unlock();
00124 return ready;
00125 }
00126
00127 bool
00128 FileReadThread::isCancelled(int token)
00129 {
00130 m_mutex.lock();
00131
00132 bool cancelled =
00133 m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
00134 m_newlyCancelled.find(token) == m_newlyCancelled.end();
00135
00136 m_mutex.unlock();
00137 return cancelled;
00138 }
00139
00140 bool
00141 FileReadThread::getRequest(int token, Request &request)
00142 {
00143 m_mutex.lock();
00144
00145 bool found = false;
00146
00147 if (m_queue.find(token) != m_queue.end()) {
00148 request = m_queue[token];
00149 found = true;
00150 } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00151 request = m_cancelledRequests[token];
00152 found = true;
00153 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00154 request = m_readyRequests[token];
00155 found = true;
00156 }
00157
00158 m_mutex.unlock();
00159
00160 return found;
00161 }
00162
00163 void
00164 FileReadThread::done(int token)
00165 {
00166 m_mutex.lock();
00167
00168 bool found = false;
00169
00170 if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00171 m_cancelledRequests.erase(token);
00172 m_newlyCancelled.erase(token);
00173 found = true;
00174 } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00175 m_readyRequests.erase(token);
00176 found = true;
00177 } else if (m_queue.find(token) != m_queue.end()) {
00178 std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
00179 }
00180
00181 m_mutex.unlock();
00182
00183 if (!found) {
00184 std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
00185 }
00186 }
00187
00188 void
00189 FileReadThread::process()
00190 {
00191
00192
00193 #ifdef DEBUG_FILE_READ_THREAD
00194 Profiler profiler("FileReadThread::process()", true);
00195 #endif
00196
00197 int token = m_queue.begin()->first;
00198 Request request = m_queue.begin()->second;
00199
00200 m_mutex.unlock();
00201
00202 #ifdef DEBUG_FILE_READ_THREAD
00203 std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
00204 #endif
00205
00206 bool successful = false;
00207 bool seekFailed = false;
00208 ssize_t r = 0;
00209
00210 if (request.mutex) request.mutex->lock();
00211
00212 if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
00213 seekFailed = true;
00214 } else {
00215
00216
00217
00218
00219 static const size_t blockSize = 256 * 1024;
00220
00221 size_t size = request.size;
00222 char *destination = request.data;
00223
00224 while (size > 0) {
00225 size_t readSize = size;
00226 if (readSize > blockSize) readSize = blockSize;
00227 ssize_t br = ::read(request.fd, destination, readSize);
00228 if (br < 0) {
00229 r = br;
00230 break;
00231 } else {
00232 r += br;
00233 if (br < ssize_t(readSize)) break;
00234 }
00235 destination += readSize;
00236 size -= readSize;
00237 }
00238 }
00239
00240 if (request.mutex) request.mutex->unlock();
00241
00242 if (seekFailed) {
00243 ::perror("Seek failed");
00244 std::cerr << "ERROR: FileReadThread::process: seek to "
00245 << request.start << " failed" << std::endl;
00246 request.size = 0;
00247 } else {
00248 if (r < 0) {
00249 ::perror("ERROR: FileReadThread::process: Read failed");
00250 request.size = 0;
00251 } else if (r < ssize_t(request.size)) {
00252 std::cerr << "WARNING: FileReadThread::process: read "
00253 << request.size << " returned only " << r << " bytes"
00254 << std::endl;
00255 request.size = r;
00256 usleep(100000);
00257 } else {
00258 successful = true;
00259 }
00260 }
00261
00262
00263
00264
00265 m_mutex.lock();
00266
00267 request.successful = successful;
00268
00269 if (m_queue.find(token) != m_queue.end() && !m_exiting) {
00270 m_queue.erase(token);
00271 m_readyRequests[token] = request;
00272 #ifdef DEBUG_FILE_READ_THREAD
00273 std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
00274 #endif
00275 } else {
00276 #ifdef DEBUG_FILE_READ_THREAD
00277 std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
00278 #endif
00279 }
00280 }
00281
00282 void
00283 FileReadThread::notifyCancelled()
00284 {
00285
00286
00287 while (!m_newlyCancelled.empty()) {
00288
00289 int token = *m_newlyCancelled.begin();
00290
00291 #ifdef DEBUG_FILE_READ_THREAD
00292 std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
00293 #endif
00294
00295 m_newlyCancelled.erase(token);
00296 }
00297 }
00298
00299