FileReadThread.cpp

Go to the documentation of this file.
00001 /* -*- c-basic-offset: 4 indent-tabs-mode: nil -*-  vi:set ts=8 sts=4 sw=4: */
00002 
00003 /*
00004     Sonic Visualiser
00005     An audio file viewer and annotation editor.
00006     Centre for Digital Music, Queen Mary, University of London.
00007     This file copyright 2006 Chris Cannam.
00008     
00009     This program is free software; you can redistribute it and/or
00010     modify it under the terms of the GNU General Public License as
00011     published by the Free Software Foundation; either version 2 of the
00012     License, or (at your option) any later version.  See the file
00013     COPYING included with this distribution for more information.
00014 */
00015 
00016 #include "FileReadThread.h"
00017 
00018 #include "base/Profiler.h"
00019 
00020 #include <iostream>
00021 #include <unistd.h>
00022 
00023 //#define DEBUG_FILE_READ_THREAD 1
00024 
00025 FileReadThread::FileReadThread() :
00026     m_nextToken(0),
00027     m_exiting(false)
00028 {
00029 }
00030 
00031 void
00032 FileReadThread::run()
00033 {
00034     m_mutex.lock();
00035 
00036     while (!m_exiting) {
00037         if (m_queue.empty()) {
00038             m_condition.wait(&m_mutex, 1000);
00039         } else {
00040             process();
00041         }
00042         notifyCancelled();
00043     }
00044 
00045     notifyCancelled();
00046     m_mutex.unlock();
00047 
00048 #ifdef DEBUG_FILE_READ_THREAD
00049     std::cerr << "FileReadThread::run() exiting" << std::endl;
00050 #endif
00051 }
00052 
00053 void
00054 FileReadThread::finish()
00055 {
00056 #ifdef DEBUG_FILE_READ_THREAD
00057     std::cerr << "FileReadThread::finish()" << std::endl;
00058 #endif
00059 
00060     m_mutex.lock();
00061     while (!m_queue.empty()) {
00062         m_cancelledRequests[m_queue.begin()->first] = m_queue.begin()->second;
00063         m_newlyCancelled.insert(m_queue.begin()->first);
00064         m_queue.erase(m_queue.begin());
00065     }
00066 
00067     m_exiting = true;
00068     m_mutex.unlock();
00069 
00070     m_condition.wakeAll();
00071 
00072 #ifdef DEBUG_FILE_READ_THREAD
00073     std::cerr << "FileReadThread::finish() exiting" << std::endl;
00074 #endif
00075 }
00076 
00077 int
00078 FileReadThread::request(const Request &request)
00079 {
00080     m_mutex.lock();
00081     
00082     int token = m_nextToken++;
00083     m_queue[token] = request;
00084 
00085     m_mutex.unlock();
00086     m_condition.wakeAll();
00087 
00088     return token;
00089 }
00090 
00091 void
00092 FileReadThread::cancel(int token)
00093 {
00094     m_mutex.lock();
00095 
00096     if (m_queue.find(token) != m_queue.end()) {
00097         m_cancelledRequests[token] = m_queue[token];
00098         m_queue.erase(token);
00099         m_newlyCancelled.insert(token);
00100     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00101         m_cancelledRequests[token] = m_readyRequests[token];
00102         m_readyRequests.erase(token);
00103     } else {
00104         std::cerr << "WARNING: FileReadThread::cancel: token " << token << " not found" << std::endl;
00105     }
00106 
00107     m_mutex.unlock();
00108 
00109 #ifdef DEBUG_FILE_READ_THREAD
00110     std::cerr << "FileReadThread::cancel(" << token << ") waking condition" << std::endl;
00111 #endif
00112 
00113     m_condition.wakeAll();
00114 }
00115 
00116 bool
00117 FileReadThread::isReady(int token)
00118 {
00119     m_mutex.lock();
00120 
00121     bool ready = m_readyRequests.find(token) != m_readyRequests.end();
00122 
00123     m_mutex.unlock();
00124     return ready;
00125 }
00126 
00127 bool
00128 FileReadThread::isCancelled(int token)
00129 {
00130     m_mutex.lock();
00131 
00132     bool cancelled = 
00133         m_cancelledRequests.find(token) != m_cancelledRequests.end() &&
00134         m_newlyCancelled.find(token) == m_newlyCancelled.end();
00135 
00136     m_mutex.unlock();
00137     return cancelled;
00138 }
00139 
00140 bool
00141 FileReadThread::getRequest(int token, Request &request)
00142 {
00143     m_mutex.lock();
00144 
00145     bool found = false;
00146 
00147     if (m_queue.find(token) != m_queue.end()) {
00148         request = m_queue[token];
00149         found = true;
00150     } else if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00151         request = m_cancelledRequests[token];
00152         found = true;
00153     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00154         request = m_readyRequests[token];
00155         found = true;
00156     }
00157 
00158     m_mutex.unlock();
00159     
00160     return found;
00161 }
00162 
00163 void
00164 FileReadThread::done(int token)
00165 {
00166     m_mutex.lock();
00167 
00168     bool found = false;
00169 
00170     if (m_cancelledRequests.find(token) != m_cancelledRequests.end()) {
00171         m_cancelledRequests.erase(token);
00172         m_newlyCancelled.erase(token);
00173         found = true;
00174     } else if (m_readyRequests.find(token) != m_readyRequests.end()) {
00175         m_readyRequests.erase(token);
00176         found = true;
00177     } else if (m_queue.find(token) != m_queue.end()) {
00178         std::cerr << "WARNING: FileReadThread::done(" << token << "): request is still in queue (wait or cancel it)" << std::endl;
00179     }
00180 
00181     m_mutex.unlock();
00182 
00183     if (!found) {
00184         std::cerr << "WARNING: FileReadThread::done(" << token << "): request not found" << std::endl;
00185     }
00186 }
00187 
00188 void
00189 FileReadThread::process()
00190 {
00191     // entered with m_mutex locked and m_queue non-empty
00192 
00193 #ifdef DEBUG_FILE_READ_THREAD
00194     Profiler profiler("FileReadThread::process()", true);
00195 #endif
00196 
00197     int token = m_queue.begin()->first;
00198     Request request = m_queue.begin()->second;
00199 
00200     m_mutex.unlock();
00201 
00202 #ifdef DEBUG_FILE_READ_THREAD
00203     std::cerr << "FileReadThread::process: reading " << request.start << ", " << request.size << " on " << request.fd << std::endl;
00204 #endif
00205 
00206     bool successful = false;
00207     bool seekFailed = false;
00208     ssize_t r = 0;
00209 
00210     if (request.mutex) request.mutex->lock();
00211 
00212     if (::lseek(request.fd, request.start, SEEK_SET) == (off_t)-1) {
00213         seekFailed = true;
00214     } else {
00215         
00216         // if request.size is large, we want to avoid making a single
00217         // system call to read it all as it may block too much
00218 
00219         static const size_t blockSize = 256 * 1024;
00220         
00221         size_t size = request.size;
00222         char *destination = request.data;
00223 
00224         while (size > 0) {
00225             size_t readSize = size;
00226             if (readSize > blockSize) readSize = blockSize;
00227             ssize_t br = ::read(request.fd, destination, readSize);
00228             if (br < 0) { 
00229                 r = br;
00230                 break;
00231             } else {
00232                 r += br;
00233                 if (br < ssize_t(readSize)) break;
00234             }
00235             destination += readSize;
00236             size -= readSize;
00237         }
00238     }
00239 
00240     if (request.mutex) request.mutex->unlock();
00241 
00242     if (seekFailed) {
00243         ::perror("Seek failed");
00244         std::cerr << "ERROR: FileReadThread::process: seek to "
00245                   << request.start << " failed" << std::endl;
00246         request.size = 0;
00247     } else {
00248         if (r < 0) {
00249             ::perror("ERROR: FileReadThread::process: Read failed");
00250             request.size = 0;
00251         } else if (r < ssize_t(request.size)) {
00252             std::cerr << "WARNING: FileReadThread::process: read "
00253                       << request.size << " returned only " << r << " bytes"
00254                       << std::endl;
00255             request.size = r;
00256             usleep(100000);
00257         } else {
00258             successful = true;
00259         }
00260     }
00261         
00262     // Check that the token hasn't been cancelled and the thread
00263     // hasn't been asked to finish
00264     
00265     m_mutex.lock();
00266 
00267     request.successful = successful;
00268         
00269     if (m_queue.find(token) != m_queue.end() && !m_exiting) {
00270         m_queue.erase(token);
00271         m_readyRequests[token] = request;
00272 #ifdef DEBUG_FILE_READ_THREAD
00273         std::cerr << "FileReadThread::process: done, marking as ready" << std::endl;
00274 #endif
00275     } else {
00276 #ifdef DEBUG_FILE_READ_THREAD
00277         std::cerr << "FileReadThread::process: request disappeared or exiting" << std::endl;
00278 #endif
00279     }
00280 }
00281 
00282 void
00283 FileReadThread::notifyCancelled()
00284 {
00285     // entered with m_mutex locked
00286 
00287     while (!m_newlyCancelled.empty()) {
00288 
00289         int token = *m_newlyCancelled.begin();
00290 
00291 #ifdef DEBUG_FILE_READ_THREAD
00292         std::cerr << "FileReadThread::notifyCancelled: token " << token << std::endl;
00293 #endif
00294 
00295         m_newlyCancelled.erase(token);
00296     }
00297 }
00298         
00299     

Generated on Wed Feb 20 15:45:25 2008 for SonicVisualiser by  doxygen 1.5.1