Fennel: /home/pub/open/dev/fennel/exec/ExecStreamBufAccessor.h Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #ifndef Fennel_ExecStreamBufAccessor_Included 00025 #define Fennel_ExecStreamBufAccessor_Included 00026 00027 #include "fennel/exec/ExecStreamDefs.h" 00028 #include "fennel/tuple/TupleDescriptor.h" 00029 #include "fennel/tuple/TupleFormat.h" 00030 #include "fennel/tuple/TupleAccessor.h" 00031 #include "fennel/tuple/TupleProjectionAccessor.h" 00032 #include "fennel/tuple/TupleOverflowExcn.h" 00033 00034 #include <boost/utility.hpp> 00035 00036 FENNEL_BEGIN_NAMESPACE 00037 00045 class FENNEL_EXEC_EXPORT ExecStreamBufAccessor 00046 : public boost::noncopyable 00047 { 00048 PBuffer pBufStart; 00049 00050 PBuffer pBufEnd; 00051 00052 PBuffer pProducer; 00053 00054 PBuffer pConsumer; 00055 00056 ExecStreamBufProvision provision; 00057 00058 ExecStreamBufState state; 00059 00060 bool pendingEOS; 00061 00062 TupleDescriptor tupleDesc; 00063 00064 TupleFormat tupleFormat; 00065 00066 TupleAccessor tupleProductionAccessor; 00067 00068 TupleAccessor tupleConsumptionAccessor; 00069 00070 TupleProjectionAccessor tupleProjectionAccessor; 00071 00072 uint cbBuffer; 00073 00075 inline void setEOS(); 00076 00077 public: 00078 inline explicit ExecStreamBufAccessor(); 00079 00080 virtual ~ExecStreamBufAccessor() 00081 { 00082 } 00083 00089 inline void setProvision(ExecStreamBufProvision provision); 00090 00098 inline void setTupleShape( 00099 TupleDescriptor const &tupleDesc, 00100 TupleFormat tupleFormat = TUPLE_FORMAT_STANDARD); 00101 00105 inline void clear(); 00106 00117 inline void provideBufferForProduction( 00118 PBuffer pStart, 00119 PBuffer pEnd, 00120 bool reusable); 00121 00130 inline void provideBufferForConsumption( 00131 PConstBuffer pStart, 00132 PConstBuffer pEnd); 00133 00138 inline void requestProduction(); 00139 00144 inline void requestConsumption(); 00145 00150 inline bool isProductionPossible() const; 00151 00156 inline bool isConsumptionPossible() const; 00157 00164 inline bool demandData(); 00165 00171 inline void markEOS(); 00172 00188 inline void produceData(PBuffer pEnd); 00189 00205 inline void consumeData(PConstBuffer pEnd); 00206 00212 inline PConstBuffer getConsumptionStart() const; 00213 00219 inline PConstBuffer getConsumptionEnd() const; 00220 00227 inline uint getConsumptionAvailable() const; 00228 00239 uint getConsumptionAvailableBounded(uint cbLimit); 00240 00246 inline uint getConsumptionTuplesAvailable(); 00247 00256 inline PConstBuffer spanWholeTuples(PConstBuffer start, uint size); 00257 00265 inline PBuffer getProductionStart() const; 00266 00274 inline PBuffer getProductionEnd() const; 00275 00282 inline uint getProductionAvailable() const; 00283 00289 inline ExecStreamBufState getState() const; 00290 00295 inline bool hasPendingEOS() const; 00296 00302 inline ExecStreamBufProvision getProvision() const; 00303 00309 inline TupleDescriptor const &getTupleDesc() const; 00310 00316 inline TupleFormat getTupleFormat() const; 00317 00324 inline void validateTupleSize(TupleData const &tupleData); 00325 00335 inline bool produceTuple(TupleData const &tupleData); 00336 00344 inline TupleAccessor &accessConsumptionTuple(); 00345 00355 inline void unmarshalTuple(TupleData &tupleData, uint iFirstDatum = 0); 00356 00361 inline void consumeTuple(); 00362 00367 inline bool isTupleConsumptionPending() const; 00368 00372 inline TupleAccessor &getConsumptionTupleAccessor(); 00373 00378 inline TupleAccessor &getScratchTupleAccessor(); 00379 00385 inline void bindProjection(TupleProjection const &inputProj); 00386 00392 inline void unmarshalProjectedTuple(TupleData &projTupleData); 00393 }; 00394 00395 inline ExecStreamBufAccessor::ExecStreamBufAccessor() 00396 { 00397 clear(); 00398 provision = BUFPROV_NONE; 00399 state = EXECBUF_EOS; 00400 tupleFormat = TUPLE_FORMAT_STANDARD; 00401 cbBuffer = 0; 00402 } 00403 00404 inline bool ExecStreamBufAccessor::isProductionPossible() const 00405 { 00406 return pendingEOS && (state != EXECBUF_EOS) && (state != EXECBUF_OVERFLOW); 00407 } 00408 00409 inline bool ExecStreamBufAccessor::isConsumptionPossible() const 00410 { 00411 return (state == EXECBUF_OVERFLOW) || (state == EXECBUF_NONEMPTY); 00412 } 00413 00414 inline void ExecStreamBufAccessor::setProvision( 00415 ExecStreamBufProvision provisionInit) 00416 { 00417 assert(provision == BUFPROV_NONE); 00418 provision = provisionInit; 00419 } 00420 00421 inline void ExecStreamBufAccessor::setTupleShape( 00422 TupleDescriptor const &tupleDescInit, 00423 TupleFormat tupleFormatInit) 00424 { 00425 tupleDesc = tupleDescInit; 00426 tupleFormat = tupleFormatInit; 00427 tupleProductionAccessor.compute(tupleDesc, tupleFormat); 00428 tupleConsumptionAccessor.compute(tupleDesc, tupleFormat); 00429 } 00430 00431 inline void ExecStreamBufAccessor::clear() 00432 { 00433 pBufStart = NULL; 00434 pBufEnd = NULL; 00435 pProducer = NULL; 00436 pConsumer = NULL; 00437 cbBuffer = 0; 00438 state = EXECBUF_EMPTY; 00439 pendingEOS = false; 00440 tupleProductionAccessor.resetCurrentTupleBuf(); 00441 tupleConsumptionAccessor.resetCurrentTupleBuf(); 00442 } 00443 00444 inline void ExecStreamBufAccessor::provideBufferForProduction( 00445 PBuffer pStart, 00446 PBuffer pEnd, 00447 bool reusable) 00448 { 00449 assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY)); 00450 assert(provision == BUFPROV_CONSUMER); 00451 pBufStart = pStart; 00452 pBufEnd = pEnd; 00453 pProducer = pStart; 00454 pConsumer = pStart; 00455 cbBuffer = pEnd - pStart; 00456 state = EXECBUF_UNDERFLOW; 00457 00458 if (!reusable) { 00459
00460 pBufStart = NULL; 00461 } 00462 } 00463 00464 inline void ExecStreamBufAccessor::provideBufferForConsumption( 00465 PConstBuffer pStart, 00466 PConstBuffer pEnd) 00467 { 00468 assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY)); 00469 assert(provision == BUFPROV_PRODUCER); 00470 pBufStart = const_cast(pStart); 00471 pBufEnd = const_cast(pEnd); 00472 pConsumer = pBufStart; 00473 pProducer = pBufEnd; 00474 state = EXECBUF_OVERFLOW; 00475 00476
00477 pBufStart = NULL; 00478 } 00479 00480 inline void ExecStreamBufAccessor::requestProduction() 00481 { 00482 assert((state == EXECBUF_UNDERFLOW) || (state == EXECBUF_EMPTY)); 00483 state = EXECBUF_UNDERFLOW; 00484 pProducer = pBufStart; 00485 pConsumer = pBufStart; 00486 } 00487 00488 inline void ExecStreamBufAccessor::requestConsumption() 00489 { 00490 assert((state == EXECBUF_OVERFLOW) || (state == EXECBUF_NONEMPTY)); 00491 state = EXECBUF_OVERFLOW; 00492 } 00493 00494 inline void ExecStreamBufAccessor::markEOS() 00495 { 00496 if (isConsumptionPossible()) { 00497 pendingEOS = true; 00498 return; 00499 } 00500 setEOS(); 00501 } 00502 00503 inline void ExecStreamBufAccessor::setEOS() 00504 { 00505 assert(pProducer == pConsumer); 00506 clear(); 00507 state = EXECBUF_EOS; 00508 } 00509 00510 inline PConstBuffer ExecStreamBufAccessor::getConsumptionStart() const 00511 { 00512 return pConsumer; 00513 } 00514 00515 inline PConstBuffer ExecStreamBufAccessor::getConsumptionEnd() const 00516 { 00517 return pProducer; 00518 } 00519 00520 inline uint ExecStreamBufAccessor::getConsumptionAvailable() const 00521 { 00522 return getConsumptionEnd() - getConsumptionStart(); 00523 } 00524 00525 inline uint ExecStreamBufAccessor::getConsumptionTuplesAvailable() 00526 { 00527 TupleAccessor& acc = getScratchTupleAccessor(); 00528 PConstBuffer p = getConsumptionStart(), 00529 end = getConsumptionEnd(); 00530 int count = 0; 00531 while (p < end) { 00532 acc.setCurrentTupleBuf(p); 00533 p += acc.getCurrentByteCount(); 00534 ++count; 00535 } 00536 return count; 00537 } 00538 00539 inline PConstBuffer ExecStreamBufAccessor::spanWholeTuples( 00540 PConstBuffer start, uint size) 00541 { 00542 TupleAccessor& acc = getScratchTupleAccessor(); 00543 assert(size > 0); 00544 PConstBuffer p = start; 00545 PConstBuffer pend = start + size; 00546 for (int ct = 0; ; ct++) { 00547 assert(p < pend); 00548 acc.setCurrentTupleBuf(p); 00549 PConstBuffer q = p; 00550 p += acc.getCurrentByteCount(); 00551 if (p >= pend) { 00552 if (p == pend) { 00553
00554 return p; 00555 } else { 00556
00557 return q; 00558 } 00559 } 00560 } 00561 assert(false); 00562 } 00563 00564 00565 inline PBuffer ExecStreamBufAccessor::getProductionStart() const 00566 { 00567 return pProducer; 00568 } 00569 00570 inline PBuffer ExecStreamBufAccessor::getProductionEnd() const 00571 { 00572 return pBufEnd; 00573 } 00574 00575 inline uint ExecStreamBufAccessor::getProductionAvailable() const 00576 { 00577 return pProducer ? (pBufEnd - pProducer) : 0; 00578 } 00579 00580 inline ExecStreamBufState ExecStreamBufAccessor::getState() const 00581 { 00582 return state; 00583 } 00584 00585 inline bool ExecStreamBufAccessor::hasPendingEOS() const 00586 { 00587 return pendingEOS; 00588 } 00589 00590 inline ExecStreamBufProvision ExecStreamBufAccessor::getProvision() const 00591 { 00592 return provision; 00593 } 00594 00595 inline TupleFormat ExecStreamBufAccessor::getTupleFormat() const 00596 { 00597 return tupleFormat; 00598 } 00599 00600 inline TupleDescriptor const &ExecStreamBufAccessor::getTupleDesc() const 00601 { 00602 return tupleDesc; 00603 } 00604 00605 inline void ExecStreamBufAccessor::produceData(PBuffer pEnd) 00606 { 00607 assert(isProductionPossible()); 00608 assert(pEnd > getProductionStart()); 00609 assert(pEnd <= getProductionEnd()); 00610 pProducer = pEnd; 00611 state = EXECBUF_NONEMPTY; 00612 } 00613 00614 inline void ExecStreamBufAccessor::consumeData(PConstBuffer pEnd) 00615 { 00616 assert(isConsumptionPossible()); 00617 assert(pEnd > getConsumptionStart()); 00618 assert(pEnd <= getConsumptionEnd()); 00619 pConsumer = const_cast(pEnd); 00620 if (pConsumer == getConsumptionEnd()) { 00621 if (pendingEOS) { 00622 setEOS(); 00623 } else { 00624 state = EXECBUF_EMPTY; 00625 } 00626 } else { 00627
00628
00629 state = EXECBUF_NONEMPTY; 00630 } 00631 } 00632 00633 inline void ExecStreamBufAccessor::validateTupleSize( 00634 TupleData const &tupleData) 00635 { 00636 if (cbBuffer == 0) { 00637 return;
00638 } 00639 if (tupleProductionAccessor.isBufferSufficient(tupleData, cbBuffer)) { 00640 uint cbTuple = tupleProductionAccessor.getByteCount(tupleData); 00641 throw TupleOverflowExcn(tupleDesc, tupleData, cbTuple, cbBuffer); 00642 } 00643 } 00644 00645 inline bool ExecStreamBufAccessor::produceTuple(TupleData const &tupleData) 00646 { 00647 assert(getState() != EXECBUF_EOS); 00648 assert(pendingEOS); 00649 00650 if (tupleProductionAccessor.isBufferSufficient( 00651 tupleData, getProductionAvailable())) 00652 { 00653 tupleProductionAccessor.marshal(tupleData, getProductionStart()); 00654 produceData( 00655 getProductionStart() 00656 + tupleProductionAccessor.getCurrentByteCount()); 00657 return true; 00658 } else { 00659 validateTupleSize(tupleData); 00660 if (getState() == EXECBUF_NONEMPTY) { 00661 requestConsumption(); 00662 } 00663 return false; 00664 } 00665 } 00666 00667 inline TupleAccessor &ExecStreamBufAccessor::accessConsumptionTuple() 00668 { 00669 assert(isConsumptionPossible()); 00670 assert(tupleConsumptionAccessor.getCurrentTupleBuf()); 00671 00672 tupleConsumptionAccessor.setCurrentTupleBuf(getConsumptionStart()); 00673 return tupleConsumptionAccessor; 00674 } 00675 00676 inline void ExecStreamBufAccessor::unmarshalTuple( 00677 TupleData &tupleData, uint iFirstDatum) 00678 { 00679 accessConsumptionTuple(); 00680 tupleConsumptionAccessor.unmarshal(tupleData, iFirstDatum); 00681 } 00682 00683 inline void ExecStreamBufAccessor::consumeTuple() 00684 { 00685 assert(tupleConsumptionAccessor.getCurrentTupleBuf()); 00686 00687 consumeData( 00688 getConsumptionStart() + tupleConsumptionAccessor.getCurrentByteCount()); 00689 tupleConsumptionAccessor.resetCurrentTupleBuf(); 00690 } 00691 00692 inline bool ExecStreamBufAccessor::isTupleConsumptionPending() const 00693 { 00694 if (tupleConsumptionAccessor.getCurrentTupleBuf()) { 00695 return true; 00696 } else { 00697 return false; 00698 } 00699 } 00700 00701 inline TupleAccessor &ExecStreamBufAccessor::getConsumptionTupleAccessor() 00702 { 00703 return tupleConsumptionAccessor; 00704 } 00705 00706 inline TupleAccessor &ExecStreamBufAccessor::getScratchTupleAccessor() 00707 { 00708
00709
00710 return tupleProductionAccessor; 00711 } 00712 00713 inline bool ExecStreamBufAccessor::demandData() 00714 { 00715 if (state == EXECBUF_EOS) { 00716 return false; 00717 } else if (isConsumptionPossible()) { 00718 return true; 00719 } else { 00720 requestProduction(); 00721 return false; 00722 } 00723 } 00724 00725 inline void ExecStreamBufAccessor::bindProjection( 00726 TupleProjection const &inputProj) 00727 { 00728 tupleProjectionAccessor.bind(tupleConsumptionAccessor, inputProj); 00729 } 00730 00731 inline void ExecStreamBufAccessor::unmarshalProjectedTuple( 00732 TupleData &projTupleData) 00733 { 00734 accessConsumptionTuple(); 00735 tupleProjectionAccessor.unmarshal(projTupleData); 00736 } 00737 00738 FENNEL_END_NAMESPACE 00739 00740 #endif 00741 00742