Fennel: /home/pub/open/dev/fennel/calculator/CalcExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/calculator/CalcExcn.h" 00026 #include "fennel/calculator/CalcExecStream.h" 00027 #include "fennel/exec/ExecStreamGraph.h" 00028 #include "fennel/exec/ExecStreamBufAccessor.h" 00029 00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $"); 00031 00032 void CalcExecStream::prepare(CalcExecStreamParams const &params) 00033 { 00034 ConduitExecStream::prepare(params); 00035 stopOnCalcError = params.stopOnCalcError; 00036 00037 try { 00038
00039 (void) CalcInit::instance(); 00040 00041 pCalc.reset(new Calculator(pDynamicParamManager.get())); 00042 if (isTracing()) { 00043 pCalc->initTraceSource(getSharedTraceTarget(), "calc"); 00044 } 00045 00046 pCalc->assemble(params.program.c_str()); 00047 00048 if (params.isFilter) { 00049 pFilterDatum = &((*(pCalc->getStatusRegister()))[0]); 00050 } else { 00051 pFilterDatum = NULL; 00052 } 00053 00054 FENNEL_TRACE( 00055 TRACE_FINER, 00056 "calc program = " 00057 << std::endl << params.program); 00058 00059 FENNEL_TRACE( 00060 TRACE_FINER, 00061 "calc input TupleDescriptor = " 00062 << pCalc->getInputRegisterDescriptor()); 00063 00064 inputDesc = pInAccessor->getTupleDesc(); 00065 FENNEL_TRACE( 00066 TRACE_FINER, 00067 "xo input TupleDescriptor = " 00068 << inputDesc); 00069 00070 FENNEL_TRACE( 00071 TRACE_FINER, 00072 "calc output TupleDescriptor = " 00073 << pCalc->getOutputRegisterDescriptor()); 00074 00075 FENNEL_TRACE( 00076 TRACE_FINER, 00077 "xo output TupleDescriptor = " 00078 << params.outputTupleDesc); 00079 00080 assert(inputDesc.storageEqual(pCalc->getInputRegisterDescriptor())); 00081 00082 TupleDescriptor outputDesc = pCalc->getOutputRegisterDescriptor(); 00083 00084 if (!params.outputTupleDesc.empty()) { 00085 assert(outputDesc.storageEqual(params.outputTupleDesc)); 00086 00087
00088
00089 outputDesc = params.outputTupleDesc; 00090 } 00091 pOutAccessor->setTupleShape( 00092 outputDesc, 00093 pInAccessor->getTupleFormat()); 00094 00095 inputData.compute(inputDesc); 00096 00097 outputData.compute(outputDesc); 00098 00099
00100 pCalc->bind(&inputData,&outputData); 00101 00102
00103
00104
00105
00106 pCalc->continueOnException(false); 00107 } catch (FennelExcn e) { 00108 FENNEL_TRACE( 00109 TRACE_SEVERE, 00110 "error preparing calculator: " << e.getMessage()); 00111 throw e; 00112 } 00113 } 00114 00115 void CalcExecStream::open(bool restart) 00116 { 00117 ConduitExecStream::open(restart); 00118 00119
00120 if (pCalc != NULL) { 00121 pCalc->zeroStatusRegister(); 00122 } 00123 } 00124 00125 ExecStreamResult CalcExecStream::execute(ExecStreamQuantum const &quantum) 00126 { 00127 ExecStreamResult rc = precheckConduitBuffers(); 00128 if (rc != EXECRC_YIELD) { 00129 return rc; 00130 } 00131 00132 #define TRACE_RETURN
00133 FENNEL_TRACE(TRACE_FINE, "read " << nRead << " rows, wrote " << nWritten) 00134 00135 FENNEL_TRACE(TRACE_FINER, "start execute loop"); 00136 uint nRead = 0; 00137 uint nWritten = 0; 00138 while (nRead < quantum.nTuplesMax) { 00139 while (pInAccessor->isTupleConsumptionPending()) { 00140 if (pInAccessor->demandData()) { 00141 TRACE_RETURN; 00142 return EXECRC_BUF_UNDERFLOW; 00143 } 00144 00145 FENNEL_TRACE(TRACE_FINER, "input row " << nRead); 00146 pInAccessor->unmarshalTuple(inputData); 00147 try { 00148 pCalc->exec(); 00149 } catch (FennelExcn e) { 00150 FENNEL_TRACE( 00151 TRACE_SEVERE, 00152 "error executing calculator: " << e.getMessage()); 00153 throw e; 00154 } 00155 bool skip = false; 00156 if (! pCalc->mWarnings.empty()) { 00157
00158
00159
00160
00161 FENNEL_TRACE( 00162 TRACE_WARNING, "calculator error " << pCalc->warnings()); 00163 if (stopOnCalcError) { 00164 throw CalcExcn(pCalc->warnings(), inputDesc, inputData); 00165 } 00166 skip = true; 00167 } else if (pFilterDatum) { 00168 bool filterDiscard = 00169 *reinterpret_cast<bool const *>(pFilterDatum->pData); 00170 if (filterDiscard) { 00171 skip = true; 00172 } 00173 } 00174 if (skip) { 00175 FENNEL_TRACE(TRACE_FINER, "skip row " << nRead); 00176 pInAccessor->consumeTuple(); 00177 ++nRead; 00178 } 00179 } 00180 00181 FENNEL_TRACE(TRACE_FINER, "output row " << nWritten); 00182 if (pOutAccessor->produceTuple(outputData)) { 00183 TRACE_RETURN; 00184 return EXECRC_BUF_OVERFLOW; 00185 } 00186 ++nWritten; 00187 pInAccessor->consumeTuple(); 00188 ++nRead; 00189 } 00190 TRACE_RETURN; 00191 return EXECRC_QUANTUM_EXPIRED; 00192 00193 #undef TRACE_RETURN 00194 } 00195 00196 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/calculator/CalcExecStream.cpp#2 $"); 00197 00198