Fennel: /home/pub/open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/common/FennelResource.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 #include "fennel/tuple/StoredTypeDescriptor.h" 00028 #include "fennel/tuple/StandardTypeDescriptor.h" 00029 00030 #include "fennel/flatfile/FlatFileExecStreamImpl.h" 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $"); 00033 00034 FlatFileExecStream FlatFileExecStream::newFlatFileExecStream() 00035 { 00036 return new FlatFileExecStreamImpl(); 00037 } 00038 00039 00040 00041 const uint FlatFileExecStreamImpl::MAX_ROW_ERROR_TEXT_WIDTH = 4000; 00042 00043 void FlatFileExecStreamImpl::prepare( 00044 FlatFileExecStreamParams const &params) 00045 { 00046 SingleOutputExecStream::prepare(params); 00047 00048 header = params.header; 00049 dataFilePath = params.dataFilePath; 00050 lenient = params.lenient; 00051 trim = params.trim; 00052 mapped = params.mapped; 00053 columnNames = params.columnNames; 00054 00055 dataTuple.compute(pOutAccessor->getTupleDesc()); 00056 00057 scratchAccessor = params.scratchAccessor; 00058 bufferLock.accessSegment(scratchAccessor); 00059 00060 mode = params.mode; 00061 rowDesc = readTupleDescriptor(pOutAccessor->getTupleDesc()); 00062 rowDesc.setLenient(lenient); 00063 pBuffer.reset( 00064 new FlatFileBuffer(params.dataFilePath), 00065 ClosableObjectDestructor()); 00066 pParser.reset(new FlatFileParser( 00067 params.fieldDelim, params.rowDelim, 00068 params.quoteChar, params.escapeChar, 00069 params.trim)); 00070 00071 numRowsScan = params.numRowsScan; 00072 textDesc = params.outputTupleDesc; 00073 } 00074 00075 void FlatFileExecStreamImpl::getResourceRequirements( 00076 ExecStreamResourceQuantity &minQuantity, 00077 ExecStreamResourceQuantity &optQuantity) 00078 { 00079 SingleOutputExecStream::getResourceRequirements(minQuantity,optQuantity); 00080 minQuantity.nCachePages += 2; 00081 optQuantity = minQuantity; 00082 } 00083 00084 void FlatFileExecStreamImpl::open(bool restart) 00085 { 00086 if (restart) { 00087 releaseResources(); 00088 } 00089 SingleOutputExecStream::open(restart); 00090 00091 if (!restart) { 00092 bufferLock.allocatePage(); 00093 uint cbPageSize = bufferLock.getPage().getCache().getPageSize(); 00094 pBufferStorage = bufferLock.getPage().getWritableData(); 00095 pBuffer->setStorage((char)pBufferStorage, cbPageSize); 00096 } 00097 pBuffer->open(); 00098 pBuffer->read(); 00099 next = pBuffer->getReadPtr(); 00100 isRowPending = false; 00101 nRowsOutput = nRowErrors = 0; 00102 lastResult.reset(); 00103 00104 if (header) { 00105 FlatFileRowDescriptor headerDesc; 00106 for (uint i = 0; i < rowDesc.size(); i++) { 00107 headerDesc.push_back( 00108 FlatFileColumnDescriptor( 00109 FLAT_FILE_MAX_COLUMN_NAME_LEN)); 00110 } 00111 headerDesc.setLenient(lenient); 00112 if (mapped) { 00113 headerDesc.setUnbounded(); 00114 } 00115 pParser->scanRow( 00116 pBuffer->getReadPtr(), pBuffer->getSize(), headerDesc, lastResult); 00117 pBuffer->setReadPtr(lastResult.next); 00118 if (lastResult.status != FlatFileRowParseResult::NO_STATUS) { 00119 logError(lastResult); 00120 try { 00121 checkRowDelimiter(); 00122 } catch (FennelExcn e) { 00123 reason = e.getMessage(); 00124 } 00125 throw FennelExcn( 00126 FennelResource::instance().flatfileNoHeader( 00127 dataFilePath, reason)); 00128 } 00129 00130
00131
00132
00133 if (mapped) { 00134 if (! lenient) { 00135 throw FennelExcn( 00136 FennelResource::instance() 00137 .flatfileMappedRequiresLenient()); 00138 } 00139 00140 pParser->stripQuoting(lastResult, true); 00141 uint nFields = lastResult.getReadCount(); 00142 int found = 0; 00143 00144 VectorOfUint columnMap; 00145 columnMap.resize(nFields); 00146 for (uint i = 0; i < nFields; i++) { 00147 char *n = lastResult.getColumn(i); 00148 if (n == NULL) { 00149 columnMap[i] = MAXU; 00150 } else { 00151 std::string name( 00152 n, 00153 lastResult.getColumnSize(i)); 00154 columnMap[i] = findField(name); 00155 if (isMAXU(columnMap[i])) { 00156 found++; 00157 } 00158 } 00159 } 00160 if (found == 0) { 00161 throw FennelExcn( 00162 FennelResource::instance().flatfileNoMappedColumns( 00163 std::string(" "), 00164 std::string(" "))); 00165 } 00166 rowDesc.setMap(columnMap); 00167 } 00168 } 00169 00170 done = false; 00171 } 00172 00173 ExecStreamResult FlatFileExecStreamImpl::execute( 00174 ExecStreamQuantum const &quantum) 00175 { 00176
00177 if (done && isRowPending) { 00178 pOutAccessor->markEOS(); 00179 return EXECRC_EOS; 00180 } 00181
00182 if (pOutAccessor->getState() == EXECBUF_OVERFLOW 00183 || pOutAccessor->getState() == EXECBUF_EOS) { 00184 return EXECRC_BUF_OVERFLOW; 00185 } 00186 00187
00188 for (uint nTuples = 0; nTuples < quantum.nTuplesMax;) { 00189
00190 while (isRowPending) { 00191
00192
00193 if (nTuples >= quantum.nTuplesMax) { 00194 break; 00195 } 00196 00197 if ((numRowsScan > 0 && numRowsScan == nRowsOutput) 00198 || pBuffer->isDone()) 00199 { 00200 done = true; 00201 break; 00202 } 00203 pParser->scanRow( 00204 pBuffer->getReadPtr(),pBuffer->getSize(),rowDesc,lastResult); 00205 nTuples++; 00206 00207 switch (lastResult.status) { 00208 case FlatFileRowParseResult::INCOMPLETE_COLUMN: 00209 if (pBuffer->isFull()) { 00210 lastResult.status = FlatFileRowParseResult::ROW_TOO_LARGE; 00211 } else if (pBuffer->isComplete()) { 00212 pBuffer->read(); 00213 continue; 00214 } 00215 case FlatFileRowParseResult::NO_COLUMN_DELIM: 00216 case FlatFileRowParseResult::TOO_FEW_COLUMNS: 00217 case FlatFileRowParseResult::TOO_MANY_COLUMNS: 00218 logError(lastResult); 00219 nRowErrors++; 00220 pBuffer->setReadPtr(lastResult.next); 00221 continue; 00222 case FlatFileRowParseResult::NO_STATUS: 00223 handleTuple(lastResult, dataTuple); 00224 pBuffer->setReadPtr(lastResult.next); 00225 break; 00226 default: 00227 permAssert(false); 00228 } 00229 } 00230 00231
00232 if (mode == FLATFILE_MODE_DESCRIBE && done && isRowPending) { 00233 describeStream(dataTuple); 00234 } 00235 00236
00237 if (isRowPending) { 00238 if (pOutAccessor->produceTuple(dataTuple)) { 00239 return EXECRC_BUF_OVERFLOW; 00240 } 00241 isRowPending = false; 00242 nRowsOutput++; 00243 } 00244 00245
00246 if (done) { 00247 pOutAccessor->markEOS(); 00248 return EXECRC_EOS; 00249 } 00250 } 00251 return EXECRC_QUANTUM_EXPIRED; 00252 } 00253 00254 FlatFileRowDescriptor FlatFileExecStreamImpl::readTupleDescriptor( 00255 const TupleDescriptor &tupleDesc) 00256 { 00257 StandardTypeDescriptorFactory typeFactory; 00258 FlatFileRowDescriptor rowDesc; 00259 for (uint i = 0; i < tupleDesc.size(); i++) { 00260 TupleAttributeDescriptor attr = tupleDesc[i]; 00261 StandardTypeDescriptorOrdinal ordinal = 00262 StandardTypeDescriptorOrdinal( 00263 attr.pTypeDescriptor->getOrdinal()); 00264 if (StandardTypeDescriptor::isTextArray(ordinal)) { 00265 rowDesc.push_back(FlatFileColumnDescriptor(attr.cbStorage)); 00266 } else { 00267 rowDesc.push_back( 00268 FlatFileColumnDescriptor(FLAT_FILE_MAX_NON_CHAR_VALUE_LEN)); 00269 } 00270 } 00271 if (mode == FLATFILE_MODE_DESCRIBE) { 00272 rowDesc.setUnbounded(); 00273 } 00274 return rowDesc; 00275 } 00276 00277 uint FlatFileExecStreamImpl::findField(const std::string &name) 00278 { 00279 for (uint i = 0; i < columnNames.size(); i++) { 00280 if (strcasecmp(name.c_str(), columnNames[i].c_str()) == 0) { 00281 return i; 00282 } 00283 } 00284 return MAXU; 00285 } 00286 00287 void FlatFileExecStreamImpl::handleTuple( 00288 FlatFileRowParseResult &result, 00289 TupleData &tuple) 00290 { 00291 TupleData *pTupleData = &tuple; 00292 00293
00294
00295
00296
00297 if (mode == FLATFILE_MODE_DESCRIBE) { 00298 if (fieldSizes.size() == 0) { 00299 fieldSizes.resize(result.getReadCount(), 0); 00300 } 00301
00302 if ((lenient) && fieldSizes.size() != result.getReadCount()) { 00303 FlatFileRowParseResult detail = result; 00304 if (detail.getReadCount() > fieldSizes.size()) { 00305 detail.status = FlatFileRowParseResult::TOO_MANY_COLUMNS; 00306 } else { 00307 detail.status = FlatFileRowParseResult::TOO_FEW_COLUMNS; 00308 } 00309 logError(detail); 00310 return; 00311 } 00312 } 00313 00314
00315 pParser->stripQuoting(result, trim); 00316 for (uint i = 0; i < result.getReadCount(); i++) { 00317 if (mode == FLATFILE_MODE_DESCRIBE) { 00318 if (i < fieldSizes.size()) { 00319 fieldSizes[i] = max(fieldSizes[i], result.getColumnSize(i)); 00320 } 00321 continue; 00322 } 00323 (*pTupleData)[i].pData = (PConstBuffer) result.getColumn(i); 00324
00325 (*pTupleData)[i].cbData = 00326 std::min(result.getColumnSize(i), textDesc[i].cbStorage); 00327 } 00328 00329 if (mode != FLATFILE_MODE_DESCRIBE) { 00330 isRowPending = true; 00331 } else { 00332
00333
00334 nRowsOutput++; 00335 } 00336 } 00337 00338 void FlatFileExecStreamImpl::describeStream(TupleData &tupleData) 00339 { 00340 if (fieldSizes.size() == 0) { 00341 throw FennelExcn( 00342 FennelResource::instance().flatfileDescribeFailed(dataFilePath)); 00343 } 00344 00345 std::ostringstream oss; 00346 for (int i = 0; i < fieldSizes.size(); i++) { 00347 oss << fieldSizes[i]; 00348 if (i != fieldSizes.size() - 1) { 00349 oss << " "; 00350 } 00351 } 00352
00353
00354 describeResult = oss.str(); 00355 const char *value = describeResult.c_str(); 00356 uint cbValue = describeResult.size() * sizeof(char); 00357 00358 assert(tupleData.size() == 1); 00359 tupleData[0].pData = (PConstBuffer) value; 00360 tupleData[0].cbData = cbValue; 00361 isRowPending = true; 00362 } 00363 00364 void FlatFileExecStreamImpl::logError(const FlatFileRowParseResult &result) 00365 { 00366 switch (result.status) { 00367 case FlatFileRowParseResult::INCOMPLETE_COLUMN: 00368 reason = FennelResource::instance().incompleteColumn(); 00369 break; 00370 case FlatFileRowParseResult::ROW_TOO_LARGE: 00371 reason = FennelResource::instance().rowTextTooLong(); 00372 break; 00373 case FlatFileRowParseResult::NO_COLUMN_DELIM: 00374 reason = FennelResource::instance().noColumnDelimiter(); 00375 break; 00376 case FlatFileRowParseResult::TOO_FEW_COLUMNS: 00377 reason = FennelResource::instance().tooFewColumns(); 00378 break; 00379 case FlatFileRowParseResult::TOO_MANY_COLUMNS: 00380 reason = FennelResource::instance().tooManyColumns(); 00381 break; 00382 default: 00383 permAssert(false); 00384 } 00385 logError(reason, result); 00386 } 00387 00388 void FlatFileExecStreamImpl::logError( 00389 const std::string reason, 00390 const FlatFileRowParseResult &result) 00391 { 00392 this->reason = reason; 00393 00394
00395 if (errorDesc.size() == 0) { 00396
00397 StandardTypeDescriptorFactory typeFactory; 00398 StoredTypeDescriptor const &typeDesc = 00399 typeFactory.newDataType(STANDARD_TYPE_VARCHAR); 00400 bool nullable = true; 00401 00402 errorDesc.push_back( 00403 TupleAttributeDescriptor( 00404 typeDesc, 00405 nullable, 00406 MAX_ROW_ERROR_TEXT_WIDTH)); 00407 00408 errorTuple.compute(errorDesc); 00409 } 00410 00411 uint length = result.next - result.current; 00412 length = std::min(length, MAX_ROW_ERROR_TEXT_WIDTH); 00413 errorTuple[0].pData = (PConstBuffer) result.current; 00414 errorTuple[0].cbData = length; 00415 00416 postError(ROW_ERROR, reason, errorDesc, errorTuple, -1); 00417 } 00418 00419 void FlatFileExecStreamImpl::checkRowDelimiter() 00420 { 00421 if (pBuffer->isDone() && lastResult.nRowDelimsRead == 0) { 00422 throw FennelExcn( 00423 FennelResource::instance().noRowDelimiter(dataFilePath)); 00424 } 00425 } 00426 00427 void FlatFileExecStreamImpl::closeImpl() 00428 { 00429 releaseResources(); 00430 SingleOutputExecStream::closeImpl(); 00431 } 00432 00433 void FlatFileExecStreamImpl::releaseResources() 00434 { 00435 if (pBuffer) { 00436 pBuffer->close(); 00437 } 00438 } 00439 00440 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/flatfile/FlatFileExecStreamImpl.cpp#2 $"); 00441 00442