00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "MatrixFile.h"
00017 #include "base/TempDirectory.h"
00018 #include "system/System.h"
00019 #include "base/Profiler.h"
00020 #include "base/Exceptions.h"
00021
00022 #include <sys/types.h>
00023 #include <sys/stat.h>
00024 #include <fcntl.h>
00025 #include <unistd.h>
00026
00027 #include <iostream>
00028
00029 #include <cstdio>
00030 #include <cassert>
00031
00032 #include <QFileInfo>
00033 #include <QDir>
00034
00035
00036
00037
00038 #ifdef DEBUG_MATRIX_FILE_READ_SET
00039 #ifndef DEBUG_MATRIX_FILE
00040 #define DEBUG_MATRIX_FILE 1
00041 #endif
00042 #endif
00043
00044 std::map<QString, int> MatrixFile::m_refcount;
00045 QMutex MatrixFile::m_refcountMutex;
00046
00047 MatrixFile::ResizeableBitsetMap MatrixFile::m_columnBitsets;
00048 QMutex MatrixFile::m_columnBitsetWriteMutex;
00049
00050 FileReadThread *MatrixFile::m_readThread = 0;
00051
00052 static size_t totalStorage = 0;
00053 static size_t totalMemory = 0;
00054 static size_t totalCount = 0;
00055
00056 MatrixFile::MatrixFile(QString fileBase, Mode mode,
00057 size_t cellSize, bool eagerCache) :
00058 m_fd(-1),
00059 m_mode(mode),
00060 m_flags(0),
00061 m_fmode(0),
00062 m_cellSize(cellSize),
00063 m_width(0),
00064 m_height(0),
00065 m_headerSize(2 * sizeof(size_t)),
00066 m_defaultCacheWidth(1024),
00067 m_prevX(0),
00068 m_eagerCache(eagerCache),
00069 m_requestToken(-1),
00070 m_spareData(0),
00071 m_columnBitset(0)
00072 {
00073 Profiler profiler("MatrixFile::MatrixFile", true);
00074
00075 if (!m_readThread) {
00076 m_readThread = new FileReadThread;
00077 m_readThread->start();
00078 }
00079
00080 m_cache.data = 0;
00081
00082 QDir tempDir(TempDirectory::getInstance()->getPath());
00083 QString fileName(tempDir.filePath(QString("%1.mfc").arg(fileBase)));
00084 bool newFile = !QFileInfo(fileName).exists();
00085
00086 if (newFile && m_mode == ReadOnly) {
00087 std::cerr << "ERROR: MatrixFile::MatrixFile: Read-only mode "
00088 << "specified, but cache file does not exist" << std::endl;
00089 throw FileNotFound(fileName);
00090 }
00091
00092 if (!newFile && m_mode == ReadWrite) {
00093 std::cerr << "Note: MatrixFile::MatrixFile: Read/write mode "
00094 << "specified, but file already exists; falling back to "
00095 << "read-only mode" << std::endl;
00096 m_mode = ReadOnly;
00097 }
00098
00099 if (!eagerCache && m_mode == ReadOnly) {
00100 std::cerr << "WARNING: MatrixFile::MatrixFile: Eager cacheing not "
00101 << "specified, but file is open in read-only mode -- cache "
00102 << "will not be used" << std::endl;
00103 }
00104
00105 m_flags = 0;
00106 m_fmode = S_IRUSR | S_IWUSR;
00107
00108 if (m_mode == ReadWrite) {
00109 m_flags = O_RDWR | O_CREAT;
00110 } else {
00111 m_flags = O_RDONLY;
00112 }
00113
00114 #ifdef _WIN32
00115 m_flags |= O_BINARY;
00116 #endif
00117
00118 #ifdef DEBUG_MATRIX_FILE
00119 std::cerr << "MatrixFile::MatrixFile: opening " << fileName.toStdString() << "..." << std::endl;
00120 #endif
00121
00122 if ((m_fd = ::open(fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
00123 ::perror("Open failed");
00124 std::cerr << "ERROR: MatrixFile::MatrixFile: "
00125 << "Failed to open cache file \""
00126 << fileName.toStdString() << "\"";
00127 if (m_mode == ReadWrite) std::cerr << " for writing";
00128 std::cerr << std::endl;
00129 throw FailedToOpenFile(fileName);
00130 }
00131
00132 if (newFile) {
00133 resize(0, 0);
00134 } else {
00135 size_t header[2];
00136 if (::read(m_fd, header, 2 * sizeof(size_t)) < 0) {
00137 ::perror("MatrixFile::MatrixFile: read failed");
00138 std::cerr << "ERROR: MatrixFile::MatrixFile: "
00139 << "Failed to read header (fd " << m_fd << ", file \""
00140 << fileName.toStdString() << "\")" << std::endl;
00141 throw FileReadFailed(fileName);
00142 }
00143 m_width = header[0];
00144 m_height = header[1];
00145 seekTo(0, 0);
00146 }
00147
00148 m_fileName = fileName;
00149
00150 m_columnBitsetWriteMutex.lock();
00151
00152 if (m_columnBitsets.find(m_fileName) == m_columnBitsets.end()) {
00153 m_columnBitsets[m_fileName] = new ResizeableBitset;
00154 }
00155 m_columnBitset = m_columnBitsets[m_fileName];
00156
00157 m_columnBitsetWriteMutex.unlock();
00158
00159 QMutexLocker locker(&m_refcountMutex);
00160 ++m_refcount[fileName];
00161
00162
00163
00164
00165
00166 ++totalCount;
00167
00168 }
00169
00170 MatrixFile::~MatrixFile()
00171 {
00172 char *requestData = 0;
00173
00174 if (m_requestToken >= 0) {
00175 FileReadThread::Request request;
00176 if (m_readThread->getRequest(m_requestToken, request)) {
00177 requestData = request.data;
00178 }
00179 m_readThread->cancel(m_requestToken);
00180 }
00181
00182 if (requestData) free(requestData);
00183 if (m_cache.data) free(m_cache.data);
00184 if (m_spareData) free(m_spareData);
00185
00186 if (m_fd >= 0) {
00187 if (::close(m_fd) < 0) {
00188 ::perror("MatrixFile::~MatrixFile: close failed");
00189 }
00190 }
00191
00192 if (m_fileName != "") {
00193
00194 QMutexLocker locker(&m_refcountMutex);
00195
00196 if (--m_refcount[m_fileName] == 0) {
00197
00198 if (::unlink(m_fileName.toLocal8Bit())) {
00199
00200
00201 } else {
00202
00203 }
00204
00205 QMutexLocker locker2(&m_columnBitsetWriteMutex);
00206 m_columnBitsets.erase(m_fileName);
00207 delete m_columnBitset;
00208 }
00209 }
00210
00211 totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
00212 totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);
00213 totalCount --;
00214
00215
00216
00217
00218
00219 }
00220
00221 void
00222 MatrixFile::resize(size_t w, size_t h)
00223 {
00224 Profiler profiler("MatrixFile::resize", true);
00225
00226 assert(m_mode == ReadWrite);
00227
00228 QMutexLocker locker(&m_fdMutex);
00229
00230 totalStorage -= (m_headerSize + (m_width * m_height * m_cellSize));
00231 totalMemory -= (2 * m_defaultCacheWidth * m_height * m_cellSize);
00232
00233 off_t off = m_headerSize + (w * h * m_cellSize);
00234
00235 #ifdef DEBUG_MATRIX_FILE
00236 std::cerr << "MatrixFile::resize(" << w << ", " << h << "): resizing file" << std::endl;
00237 #endif
00238
00239 if (w * h < m_width * m_height) {
00240 if (::ftruncate(m_fd, off) < 0) {
00241 ::perror("WARNING: MatrixFile::resize: ftruncate failed");
00242 throw FileOperationFailed(m_fileName, "ftruncate");
00243 }
00244 }
00245
00246 m_width = 0;
00247 m_height = 0;
00248
00249 if (::lseek(m_fd, 0, SEEK_SET) == (off_t)-1) {
00250 ::perror("ERROR: MatrixFile::resize: Seek to write header failed");
00251 throw FileOperationFailed(m_fileName, "lseek");
00252 }
00253
00254 size_t header[2];
00255 header[0] = w;
00256 header[1] = h;
00257 if (::write(m_fd, header, 2 * sizeof(size_t)) != 2 * sizeof(size_t)) {
00258 ::perror("ERROR: MatrixFile::resize: Failed to write header");
00259 throw FileOperationFailed(m_fileName, "write");
00260 }
00261
00262 if (w > 0 && m_defaultCacheWidth > w) {
00263 m_defaultCacheWidth = w;
00264 }
00265
00266 static size_t maxCacheMB = 16;
00267 if (2 * m_defaultCacheWidth * h * m_cellSize > maxCacheMB * 1024 * 1024) {
00268 m_defaultCacheWidth = (maxCacheMB * 1024 * 1024) / (2 * h * m_cellSize);
00269 if (m_defaultCacheWidth < 16) m_defaultCacheWidth = 16;
00270 }
00271
00272 if (m_columnBitset) {
00273 QMutexLocker locker(&m_columnBitsetWriteMutex);
00274 m_columnBitset->resize(w);
00275 }
00276
00277 if (m_cache.data) {
00278 free(m_cache.data);
00279 m_cache.data = 0;
00280 }
00281
00282 if (m_spareData) {
00283 free(m_spareData);
00284 m_spareData = 0;
00285 }
00286
00287 m_width = w;
00288 m_height = h;
00289
00290 totalStorage += (m_headerSize + (m_width * m_height * m_cellSize));
00291 totalMemory += (2 * m_defaultCacheWidth * m_height * m_cellSize);
00292
00293 #ifdef DEBUG_MATRIX_FILE
00294 std::cerr << "MatrixFile::resize(" << w << ", " << h << "): cache width "
00295 << m_defaultCacheWidth << ", storage "
00296 << (m_headerSize + w * h * m_cellSize) << ", mem "
00297 << (2 * h * m_defaultCacheWidth * m_cellSize) << std::endl;
00298
00299 std::cerr << "Total storage " << totalStorage/1024 << "K, theoretical max memory "
00300 << totalMemory/1024 << "K in " << totalCount << " instances" << std::endl;
00301 #endif
00302
00303 seekTo(0, 0);
00304 }
00305
00306 void
00307 MatrixFile::reset()
00308 {
00309 Profiler profiler("MatrixFile::reset", true);
00310
00311 assert (m_mode == ReadWrite);
00312
00313 if (m_eagerCache) {
00314 void *emptyCol = calloc(m_height, m_cellSize);
00315 for (size_t x = 0; x < m_width; ++x) setColumnAt(x, emptyCol);
00316 free(emptyCol);
00317 }
00318
00319 if (m_columnBitset) {
00320 QMutexLocker locker(&m_columnBitsetWriteMutex);
00321 m_columnBitset->resize(m_width);
00322 }
00323 }
00324
00325 void
00326 MatrixFile::getColumnAt(size_t x, void *data)
00327 {
00328
00329
00330
00331
00332 if (getFromCache(x, 0, m_height, data)) return;
00333
00334
00335
00336 ssize_t r = 0;
00337
00338 #ifdef DEBUG_MATRIX_FILE
00339 std::cerr << "MatrixFile::getColumnAt(" << x << ")"
00340 << ": reading the slow way";
00341
00342 if (m_requestToken >= 0 &&
00343 x >= m_requestingX &&
00344 x < m_requestingX + m_requestingWidth) {
00345
00346 std::cerr << " (awaiting " << m_requestingX << ", " << m_requestingWidth << " from disk)";
00347 }
00348
00349 std::cerr << std::endl;
00350 #endif
00351
00352 m_fdMutex.lock();
00353
00354 if (seekTo(x, 0)) {
00355 r = ::read(m_fd, data, m_height * m_cellSize);
00356 }
00357
00358 m_fdMutex.unlock();
00359
00360 if (r < 0) {
00361 ::perror("MatrixFile::getColumnAt: read failed");
00362 std::cerr << "ERROR: MatrixFile::getColumnAt: "
00363 << "Failed to read column " << x << " (height " << m_height << ", cell size " << m_cellSize << ", fd " << m_fd << ", file \""
00364 << m_fileName.toStdString() << "\")" << std::endl;
00365 throw FileReadFailed(m_fileName);
00366 }
00367
00368 return;
00369 }
00370
00371 bool
00372 MatrixFile::getFromCache(size_t x, size_t ystart, size_t ycount, void *data)
00373 {
00374 m_cacheMutex.lock();
00375
00376 if (!m_cache.data || x < m_cache.x || x >= m_cache.x + m_cache.width) {
00377 bool left = (m_cache.data && x < m_cache.x);
00378 m_cacheMutex.unlock();
00379 primeCache(x, left);
00380 m_prevX = x;
00381 return false;
00382 }
00383
00384 memcpy(data,
00385 m_cache.data + m_cellSize * ((x - m_cache.x) * m_height + ystart),
00386 ycount * m_cellSize);
00387
00388 m_cacheMutex.unlock();
00389
00390 if (m_cache.x > 0 && x < m_prevX && x < m_cache.x + m_cache.width/4) {
00391 primeCache(x, true);
00392 }
00393
00394 if (m_cache.x + m_cache.width < m_width &&
00395 x > m_prevX &&
00396 x > m_cache.x + (m_cache.width * 3) / 4) {
00397 primeCache(x, false);
00398 }
00399
00400 m_prevX = x;
00401 return true;
00402 }
00403
00404 void
00405 MatrixFile::setColumnAt(size_t x, const void *data)
00406 {
00407 assert(m_mode == ReadWrite);
00408
00409 #ifdef DEBUG_MATRIX_FILE_READ_SET
00410 std::cerr << "MatrixFile::setColumnAt(" << x << ")" << std::endl;
00411 #endif
00412
00413 ssize_t w = 0;
00414 bool seekFailed = false;
00415
00416 m_fdMutex.lock();
00417
00418 if (seekTo(x, 0)) {
00419 w = ::write(m_fd, data, m_height * m_cellSize);
00420 } else {
00421 seekFailed = true;
00422 }
00423
00424 m_fdMutex.unlock();
00425
00426 if (!seekFailed && w != ssize_t(m_height * m_cellSize)) {
00427 ::perror("WARNING: MatrixFile::setColumnAt: write failed");
00428 throw FileOperationFailed(m_fileName, "write");
00429 } else if (seekFailed) {
00430 throw FileOperationFailed(m_fileName, "seek");
00431 } else {
00432 QMutexLocker locker(&m_columnBitsetWriteMutex);
00433 m_columnBitset->set(x);
00434 }
00435 }
00436
00437 void
00438 MatrixFile::suspend()
00439 {
00440 QMutexLocker locker(&m_fdMutex);
00441 QMutexLocker locker2(&m_cacheMutex);
00442
00443 if (m_fd < 0) return;
00444
00445 #ifdef DEBUG_MATRIX_FILE
00446 std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::suspend(): fd was " << m_fd << std::endl;
00447 #endif
00448
00449 if (m_requestToken >= 0) {
00450 void *data = 0;
00451 FileReadThread::Request request;
00452 if (m_readThread->getRequest(m_requestToken, request)) {
00453 data = request.data;
00454 }
00455 m_readThread->cancel(m_requestToken);
00456 if (data) free(data);
00457 m_requestToken = -1;
00458 }
00459
00460 if (m_cache.data) {
00461 free(m_cache.data);
00462 m_cache.data = 0;
00463 }
00464
00465 if (m_spareData) {
00466 free(m_spareData);
00467 m_spareData = 0;
00468 }
00469
00470 if (::close(m_fd) < 0) {
00471 ::perror("WARNING: MatrixFile::suspend: close failed");
00472 throw FileOperationFailed(m_fileName, "close");
00473 }
00474
00475 m_fd = -1;
00476 }
00477
00478 void
00479 MatrixFile::resume()
00480 {
00481 if (m_fd >= 0) return;
00482
00483 #ifdef DEBUG_MATRIX_FILE
00484 std::cerr << "MatrixFile(" << this << ")::resume()" << std::endl;
00485 #endif
00486
00487 if ((m_fd = ::open(m_fileName.toLocal8Bit(), m_flags, m_fmode)) < 0) {
00488 ::perror("Open failed");
00489 std::cerr << "ERROR: MatrixFile::resume: "
00490 << "Failed to open cache file \""
00491 << m_fileName.toStdString() << "\"";
00492 if (m_mode == ReadWrite) std::cerr << " for writing";
00493 std::cerr << std::endl;
00494 throw FailedToOpenFile(m_fileName);
00495 }
00496
00497 std::cerr << "MatrixFile(" << this << ":" << m_fileName.toStdString() << ")::resume(): fd is " << m_fd << std::endl;
00498 }
00499
00500 void
00501 MatrixFile::primeCache(size_t x, bool goingLeft)
00502 {
00503
00504
00505 #ifdef DEBUG_MATRIX_FILE_READ_SET
00506 std::cerr << "MatrixFile::primeCache(" << x << ", " << goingLeft << ")" << std::endl;
00507 #endif
00508
00509 size_t rx = x;
00510 size_t rw = m_defaultCacheWidth;
00511
00512 size_t left = rw / 3;
00513 if (goingLeft) left = (rw * 2) / 3;
00514
00515 if (rx > left) rx -= left;
00516 else rx = 0;
00517
00518 if (rx + rw > m_width) rw = m_width - rx;
00519
00520 if (!m_eagerCache) {
00521
00522 size_t ti = 0;
00523
00524 for (ti = 0; ti < rw; ++ti) {
00525 if (!m_columnBitset->get(rx + ti)) break;
00526 }
00527
00528 #ifdef DEBUG_MATRIX_FILE
00529 if (ti < rw) {
00530 std::cerr << "eagerCache is false and there's a hole at "
00531 << rx + ti << ", reducing rw from " << rw << " to "
00532 << ti << std::endl;
00533 }
00534 #endif
00535
00536 rw = std::min(rw, ti);
00537 if (rw < 10 || rx + rw <= x) return;
00538 }
00539
00540 QMutexLocker locker(&m_cacheMutex);
00541
00542 FileReadThread::Request request;
00543
00544 if (m_requestToken >= 0 &&
00545 m_readThread->getRequest(m_requestToken, request)) {
00546
00547 if (x >= m_requestingX &&
00548 x < m_requestingX + m_requestingWidth) {
00549
00550 if (m_readThread->isReady(m_requestToken)) {
00551
00552 if (!request.successful) {
00553 std::cerr << "ERROR: MatrixFile::primeCache: Last request was unsuccessful" << std::endl;
00554 throw FileReadFailed(m_fileName);
00555 }
00556
00557 #ifdef DEBUG_MATRIX_FILE_READ_SET
00558 std::cerr << "last request is ready! (" << m_requestingX << ", "<< m_requestingWidth << ")" << std::endl;
00559 #endif
00560
00561 m_cache.x = (request.start - m_headerSize) / (m_height * m_cellSize);
00562 m_cache.width = request.size / (m_height * m_cellSize);
00563
00564 #ifdef DEBUG_MATRIX_FILE_READ_SET
00565 std::cerr << "received last request: actual size is: " << m_cache.x << ", " << m_cache.width << std::endl;
00566 #endif
00567
00568 if (m_cache.data) {
00569 if (m_spareData) free(m_spareData);
00570 m_spareData = m_cache.data;
00571 }
00572 m_cache.data = request.data;
00573
00574 m_readThread->done(m_requestToken);
00575 m_requestToken = -1;
00576 }
00577
00578
00579 return;
00580 }
00581
00582
00583 m_readThread->cancel(m_requestToken);
00584
00585
00586 while (!m_readThread->isCancelled(m_requestToken)) {
00587 usleep(10000);
00588 }
00589
00590 #ifdef DEBUG_MATRIX_FILE_READ_SET
00591 std::cerr << "cancelled " << m_requestToken << std::endl;
00592 #endif
00593
00594 if (m_spareData) free(m_spareData);
00595 m_spareData = request.data;
00596 m_readThread->done(m_requestToken);
00597
00598 m_requestToken = -1;
00599 }
00600
00601 if (m_fd < 0) {
00602 m_fdMutex.lock();
00603 if (m_fd < 0) resume();
00604 m_fdMutex.unlock();
00605 }
00606
00607 request.fd = m_fd;
00608 request.mutex = &m_fdMutex;
00609 request.start = m_headerSize + rx * m_height * m_cellSize;
00610 request.size = rw * m_height * m_cellSize;
00611 request.data = (char *)realloc(m_spareData, rw * m_height * m_cellSize);
00612 MUNLOCK(request.data, rw * m_height * m_cellSize);
00613 m_spareData = 0;
00614
00615 m_requestingX = rx;
00616 m_requestingWidth = rw;
00617
00618 int token = m_readThread->request(request);
00619 #ifdef DEBUG_MATRIX_FILE_READ_SET
00620 std::cerr << "MatrixFile::primeCache: request token is "
00621 << token << " (x = [" << rx << "], w = [" << rw << "], left = [" << goingLeft << "])" << std::endl;
00622 #endif
00623 m_requestToken = token;
00624 }
00625
00626 bool
00627 MatrixFile::seekTo(size_t x, size_t y)
00628 {
00629 if (m_fd < 0) resume();
00630
00631 off_t off = m_headerSize + (x * m_height + y) * m_cellSize;
00632
00633 if (::lseek(m_fd, off, SEEK_SET) == (off_t)-1) {
00634 ::perror("Seek failed");
00635 std::cerr << "ERROR: MatrixFile::seekTo(" << x << ", " << y
00636 << ") failed" << std::endl;
00637 return false;
00638 }
00639
00640 return true;
00641 }
00642