Fennel: /home/pub/open/dev/fennel/test/FlatFileExecStreamTest.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/test/ExecStreamUnitTestBase.h" 00025 #include "fennel/exec/MockProducerExecStream.h" 00026 #include "fennel/exec/ExecStreamEmbryo.h" 00027 00028 #include "fennel/exec/ExecStreamGraph.h" 00029 #include "fennel/exec/ExecStreamGraphEmbryo.h" 00030 #include "fennel/exec/ExecStreamScheduler.h" 00031 #include "fennel/exec/ExecStream.h" 00032 #include "fennel/exec/ScratchBufferExecStream.h" 00033 #include "fennel/exec/ExecStreamEmbryo.h" 00034 #include "fennel/exec/ExecStreamBufAccessor.h" 00035 #include "fennel/exec/MockProducerExecStream.h" 00036 #include "fennel/tuple/TuplePrinter.h" 00037 #include "fennel/tuple/StandardTypeDescriptor.h" 00038 00039 #include <boost/test/test_tools.hpp> 00040 00041 #include "fennel/flatfile/FlatFileBuffer.h" 00042 #include "fennel/flatfile/FlatFileParser.h" 00043 #include "fennel/flatfile/FlatFileExecStream.h" 00044 00045 using namespace fennel; 00046 00051 class StringExecStreamGenerator 00052 { 00053 public: 00054 virtual ~StringExecStreamGenerator() {} 00055 00061 virtual const std::string &generateValue(uint iRow) = 0; 00062 }; 00063 00064 class StringExecStreamGeneratorImpl : public StringExecStreamGenerator 00065 { 00066 std::vectorstd::string values; 00067 00068 public: 00069 void insert(const std::string &value) 00070 { 00071 values.push_back(value); 00072 } 00073 00074
00075 const std::string &generateValue(uint iRow) 00076 { 00077 BOOST_CHECK(iRow < values.size()); 00078 return values[iRow]; 00079 } 00080 }; 00081 00082 class FlatFileExecStreamTest : public ExecStreamUnitTestBase 00083 { 00084 void checkRead( 00085 FlatFileBuffer &buffer, 00086 const char *string); 00087 00088 void checkTrim( 00089 FlatFileParser &parser, 00090 const char *string, 00091 const char *result); 00092 00093 void checkStrip( 00094 FlatFileParser &parser, 00095 const char *string, 00096 const char *result); 00097 00098 void checkColumnScan( 00099 FlatFileParser &parser, 00100 const char *string, 00101 FlatFileColumnParseResult::DelimiterType type, 00102 uint size, 00103 uint offset); 00104 00105 void verifyOutput( 00106 ExecStream &stream, 00107 uint nRowsExpected, 00108 StringExecStreamGenerator &generator); 00109 00110 public: 00111 explicit FlatFileExecStreamTest() 00112 { 00113 FENNEL_UNIT_TEST_CASE(FlatFileExecStreamTest, testBuffer); 00114 FENNEL_UNIT_TEST_CASE(FlatFileExecStreamTest, testParser); 00115 FENNEL_UNIT_TEST_CASE(FlatFileExecStreamTest, testStream); 00116 } 00117 00118 void testBuffer(); 00119 void testParser(); 00120 void testStream(); 00121 }; 00122 00123 void FlatFileExecStreamTest::testBuffer() 00124 { 00125 FixedBuffer fixedBuffer[8]; 00126 std::string path = "flatfile/buffer"; 00127 00128 SharedFlatFileBuffer pFileBuffer; 00129 pFileBuffer.reset(new FlatFileBuffer(path), ClosableObjectDestructor()); 00130 pFileBuffer->open(); 00131 pFileBuffer->setStorage((char *) fixedBuffer, (uint)8); 00132 00133 checkRead(*pFileBuffer, "12345671"); 00134 BOOST_CHECK_EQUAL(pFileBuffer->getReadPtr(), (char *)fixedBuffer); 00135 00136 pFileBuffer->setReadPtr(pFileBuffer->getReadPtr() + 7); 00137 checkRead(*pFileBuffer, "12345676"); 00138 00139 pFileBuffer->setReadPtr(pFileBuffer->getReadPtr() + 6); 00140 checkRead(*pFileBuffer, "7654\n"); 00141 BOOST_CHECK(pFileBuffer->isComplete()); 00142 } 00143 00144 void FlatFileExecStreamTest::testParser() 00145 { 00146 FlatFileParser parser(',', '\n', '"', '"'); 00147 00148 checkTrim(parser, "", ""); 00149 checkTrim(parser, "aRobin", "aRobin"); 00150 checkTrim(parser, " red breast in cage ", "red breast in cage"); 00151 00152 checkStrip(parser, "", ""); 00153 checkStrip(parser, "puts all", "puts all"); 00154 checkStrip(parser, ""heaven"", "heaven"); 00155 checkStrip(parser, " "in a" ", "in a"); 00156 checkStrip(parser, " """rage""" ", ""rage""); 00157 00158
00159 checkColumnScan( 00160 parser, ""all that\n is "gold, ", 00161 FlatFileColumnParseResult::FIELD_DELIM, 19, 20); 00162 00163
00164 checkColumnScan( 00165 parser, ""does not, glitter"\n ", 00166 FlatFileColumnParseResult::ROW_DELIM, 19, 20); 00167 00168
00169 checkColumnScan( 00170 parser, ""not all those who ""wander""", ", 00171 FlatFileColumnParseResult::FIELD_DELIM, 30, 31); 00172 00173
00174 checkColumnScan( 00175 parser, " are lost "", 00176 FlatFileColumnParseResult::NO_DELIM, 12, 12); 00177 00178
00179 checkColumnScan( 00180 parser, ""JRR, ", 00181 FlatFileColumnParseResult::NO_DELIM, 6, 6); 00182 00183
00184 checkColumnScan( 00185 parser, ""Tolkien" , ", 00186 FlatFileColumnParseResult::FIELD_DELIM, 11, 12); 00187 00188
00189 checkColumnScan( 00190 parser, "some poems", 00191 FlatFileColumnParseResult::NO_DELIM, 10, 10); 00192 } 00193 00194 void FlatFileExecStreamTest::checkRead( 00195 FlatFileBuffer &buffer, 00196 const char *string) 00197 { 00198 uint size = strlen(string); 00199 buffer.read(); 00200 BOOST_CHECK_EQUAL(buffer.getEndPtr() - buffer.getReadPtr(), size); 00201 BOOST_CHECK_EQUAL(strncmp(buffer.getReadPtr(), string, size), 0); 00202 } 00203 00204 void FlatFileExecStreamTest::checkTrim( 00205 FlatFileParser &parser, 00206 const char *string, 00207 const char *result) 00208 { 00209 char buffer[128]; 00210 assert (strlen(string) < sizeof(buffer)); 00211 strcpy(buffer, string); 00212 00213 uint size = strlen(result); 00214 BOOST_CHECK_EQUAL(parser.trim(buffer, strlen(buffer)), size); 00215 BOOST_CHECK_EQUAL(strncmp(buffer, result, size), 0); 00216 } 00217 00218 void FlatFileExecStreamTest::checkStrip( 00219 FlatFileParser &parser, 00220 const char *string, 00221 const char *result) 00222 { 00223 char buffer[128]; 00224 assert (strlen(string) < sizeof(buffer)); 00225 strcpy(buffer, string); 00226 00227 uint size = strlen(result); 00228 BOOST_CHECK_EQUAL(parser.stripQuoting(buffer, strlen(buffer), true), size); 00229 BOOST_CHECK_EQUAL(strncmp(buffer, result, size), 0); 00230 } 00231 00232 void FlatFileExecStreamTest::checkColumnScan( 00233 FlatFileParser &parser, 00234 const char *string, 00235 FlatFileColumnParseResult::DelimiterType type, 00236 uint size, 00237 uint offset) 00238 { 00239 char buffer[128]; 00240 assert(strlen(string) < sizeof(buffer)); 00241 strcpy(buffer, string); 00242 00243 FlatFileColumnParseResult result; 00244 parser.scanColumn(buffer, strlen(buffer), sizeof(buffer), result); 00245 00246 BOOST_CHECK_EQUAL(result.type, type); 00247 BOOST_CHECK_EQUAL(result.size, size); 00248 BOOST_CHECK_EQUAL(result.next, buffer + offset); 00249 } 00250 00251 void FlatFileExecStreamTest::testStream() 00252 { 00253 StandardTypeDescriptorFactory stdTypeFactory; 00254 TupleAttributeDescriptor attrDesc( 00255 stdTypeFactory.newDataType(STANDARD_TYPE_VARCHAR), 00256 false, 00257 32); 00258 00259 FlatFileExecStreamParams flatfileParams; 00260 flatfileParams.scratchAccessor = 00261 pSegmentFactory->newScratchSegment(pCache,1); 00262 flatfileParams.outputTupleDesc.push_back(attrDesc); 00263 flatfileParams.outputTupleDesc.push_back(attrDesc); 00264 flatfileParams.dataFilePath = "flatfile/stream"; 00265 flatfileParams.fieldDelim = ','; 00266 flatfileParams.rowDelim = '\n'; 00267 flatfileParams.quoteChar = '"'; 00268 flatfileParams.escapeChar = '\'; 00269 flatfileParams.header = false; 00270 00271 ExecStreamEmbryo flatfileStreamEmbryo; 00272 flatfileStreamEmbryo.init( 00273 FlatFileExecStream::newFlatFileExecStream(), flatfileParams); 00274 flatfileStreamEmbryo.getStream()->setName("FlatFileExecStream"); 00275 00276 SharedExecStream pOutputStream = prepareSourceGraph(flatfileStreamEmbryo); 00277 StringExecStreamGeneratorImpl verifier; 00278 verifier.insert("[ 'No one', 'travels' ]"); 00279 verifier.insert("[ 'Along this way', 'but I,' ]"); 00280 verifier.insert("[ 'This', 'autumn evening.' ]"); 00281 00282 verifyOutput( 00283 pOutputStream, 00284 3, 00285 verifier); 00286 } 00287 00288 void FlatFileExecStreamTest::verifyOutput( 00289 ExecStream &stream, 00290 uint nRowsExpected, 00291 StringExecStreamGenerator &generator) 00292 { 00293
00294
00295 00296 pResourceGovernor->requestResources(
pGraph); 00297 pGraph->open(); 00298 pScheduler->start(); 00299 uint nRows = 0; 00300 for (;;) { 00301 ExecStreamBufAccessor &bufAccessor = 00302 pScheduler->readStream(stream); 00303 if (bufAccessor.getState() == EXECBUF_EOS) { 00304 break; 00305 } 00306 BOOST_REQUIRE(bufAccessor.isConsumptionPossible()); 00307 const uint nCol = 00308 bufAccessor.getConsumptionTupleAccessor().size(); 00309 BOOST_REQUIRE(nCol == bufAccessor.getTupleDesc().size()); 00310 BOOST_REQUIRE(nCol >= 1); 00311 TupleData inputTuple; 00312 inputTuple.compute(bufAccessor.getTupleDesc()); 00313 std::ostringstream oss; 00314 TuplePrinter tuplePrinter; 00315 for (;;) { 00316 if (!bufAccessor.demandData()) { 00317 break; 00318 } 00319 BOOST_REQUIRE(nRows < nRowsExpected); 00320 bufAccessor.unmarshalTuple(inputTuple); 00321 tuplePrinter.print(oss,bufAccessor.getTupleDesc(),inputTuple); 00322 std::string actualValue = oss.str(); 00323 oss.str(""); 00324 const std::string &expectedValue = generator.generateValue(nRows); 00325 if (actualValue.compare(expectedValue)) { 00326 std::cout << "(Row) = (" << nRows << ")" << std::endl; 00327 BOOST_CHECK_EQUAL(expectedValue,actualValue); 00328 return; 00329 } 00330 bufAccessor.consumeTuple(); 00331 ++nRows; 00332 } 00333 } 00334 BOOST_CHECK_EQUAL(nRowsExpected,nRows); 00335 } 00336 00337 FENNEL_UNIT_TEST_SUITE(FlatFileExecStreamTest); 00338 00339