Fennel: /home/pub/open/dev/fennel/farrago/JavaTransformExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/exec/ExecStreamBufAccessor.h" 00025 #include "fennel/exec/ExecStreamGraph.h" 00026 #include "fennel/farrago/JavaTransformExecStream.h" 00027 #include "fennel/farrago/JniUtil.h" 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $"); 00030 00031 JavaTransformExecStreamParams::JavaTransformExecStreamParams() 00032 { 00033 outputTupleFormat = TUPLE_FORMAT_STANDARD; 00034 javaClassName = ""; 00035 } 00036 00037 JavaTransformExecStream::JavaTransformExecStream() 00038 { 00039 pStreamGraphHandle = NULL; 00040 outputByteBuffer1 = NULL; 00041 outputByteBuffer2 = NULL; 00042 pBuffer1 = NULL; 00043 pBuffer2 = NULL; 00044 farragoTransform = NULL; 00045 } 00046 00047 JavaTransformExecStream::~JavaTransformExecStream() 00048 { 00049 } 00050 00051 void JavaTransformExecStream::setInputBufAccessors( 00052 std::vector const &inAccessorsInit) 00053 { 00054 inAccessors = inAccessorsInit; 00055 } 00056 00057 void JavaTransformExecStream::setOutputBufAccessors( 00058 std::vector const &outAccessors) 00059 { 00060 assert(outAccessors.size() <= 1); 00061 00062 if (outAccessors.size() > 0) { 00063 pOutAccessor = outAccessors[0]; 00064 } 00065 } 00066 00067 void JavaTransformExecStream::prepare( 00068 JavaTransformExecStreamParams const &params) 00069 { 00070 ExecStream::prepare(params); 00071 00072 if (pOutAccessor) { 00073 assert(pOutAccessor->getProvision() == getOutputBufProvision()); 00074 if (pOutAccessor->getTupleDesc().empty()) { 00075 assert(!params.outputTupleDesc.empty()); 00076 pOutAccessor->setTupleShape( 00077 params.outputTupleDesc, 00078 params.outputTupleFormat); 00079 } 00080 } 00081 00082 for (uint i = 0; i < inAccessors.size(); ++i) { 00083 assert(inAccessors[i]->getProvision() == getInputBufProvision()); 00084 } 00085 00086 javaClassName = params.javaClassName; 00087 pStreamGraphHandle = params.pStreamGraphHandle; 00088 } 00089 00090 void JavaTransformExecStream::open(bool restart) 00091 { 00092 FENNEL_TRACE(TRACE_FINER, "open" << (restart ? " (restart)" : "")); 00093 ExecStream::open(restart); 00094 00095 JniEnvAutoRef pEnv; 00096 if (restart) { 00097 if (pOutAccessor) { 00098 pOutAccessor->clear(); 00099 } 00100 00101
00102 for (uint i = 0; i < inAccessors.size(); ++i) { 00103 inAccessors[i]->clear(); 00104 pGraph->getStreamInput(getStreamId(),i)->open(true); 00105 } 00106 00107 assert(farragoTransform); 00108 pEnv->CallVoidMethod( 00109 farragoTransform, 00110 JniUtil::methFarragoTransformRestart, 00111 NULL); 00112 return; 00113 } 00114 00115
00116 FENNEL_TRACE(TRACE_FINER, "finding java peer, class " << javaClassName); 00117 jobject o = 00118 pEnv->CallObjectMethod( 00119 pStreamGraphHandle->javaRuntimeContext, 00120 JniUtil::methFarragoRuntimeContextFindFarragoTransform, 00121 pEnv->NewStringUTF(javaClassName.c_str())); 00122 assert(o); 00123 farragoTransform = pEnv->NewGlobalRef(o); 00124 } 00125 00126 00127 ExecStreamResult JavaTransformExecStream::execute( 00128 ExecStreamQuantum const &quantum) 00129 { 00130 FENNEL_TRACE(TRACE_FINEST, "execute"); 00131 00132 if (pOutAccessor) { 00133 switch (pOutAccessor->getState()) { 00134 case EXECBUF_NONEMPTY: 00135 case EXECBUF_OVERFLOW: 00136 FENNEL_TRACE(TRACE_FINER, "overflow"); 00137 return EXECRC_BUF_OVERFLOW; 00138 case EXECBUF_EOS: 00139 FENNEL_TRACE(TRACE_FINER, "eos"); 00140 return EXECRC_EOS; 00141 default: 00142 break; 00143 } 00144 } 00145 00146 checkEmptyInputs(); 00147 00148 jlong jquantum = static_cast(quantum.nTuplesMax); 00149 JniEnvAutoRef pEnv; 00150 assert(farragoTransform); 00151 PBuffer pBuffer; 00152 if (pOutAccessor) { 00153 pBuffer = pOutAccessor->getProductionStart(); 00154 if (outputByteBuffer1) { 00155 outputByteBuffer1 = pEnv->NewDirectByteBuffer( 00156 pBuffer, 00157 pOutAccessor->getProductionAvailable()); 00158 outputByteBuffer1 = pEnv->NewGlobalRef(outputByteBuffer1); 00159 pBuffer1 = pBuffer; 00160 } else if (outputByteBuffer2) { 00161 if (pBuffer1 != pBuffer) { 00162 outputByteBuffer2 = pEnv->NewDirectByteBuffer( 00163 pBuffer, 00164 pOutAccessor->getProductionAvailable()); 00165 outputByteBuffer2 = pEnv->NewGlobalRef(outputByteBuffer2); 00166 pBuffer2 = pBuffer; 00167 } 00168 } 00169 } else { 00170 pBuffer = NULL; 00171 } 00172 00173
00174
00175
00176 assert((pBuffer == pBuffer1) || (pBuffer == pBuffer2)); 00177 00178
00179
00180
00181
00182 00183 int cb = pEnv->CallIntMethod( 00184 farragoTransform, 00185 JniUtil::methFarragoTransformExecute, 00186 (pBuffer == pBuffer1) ? outputByteBuffer1 : outputByteBuffer2, 00187 jquantum); 00188 00189 if (cb > 0) { 00190 assert(pOutAccessor); 00191 pOutAccessor->produceData(pBuffer + cb); 00192 FENNEL_TRACE(TRACE_FINER, "wrote " << cb << " bytes"); 00193 return EXECRC_BUF_OVERFLOW; 00194 } else if (cb < 0) { 00195 FENNEL_TRACE(TRACE_FINER, "underflow"); 00196 checkEmptyInputs(); 00197
00198
00199 return EXECRC_BUF_UNDERFLOW; 00200 } else { 00201 FENNEL_TRACE(TRACE_FINER, "marking EOS"); 00202 if (pOutAccessor) { 00203 pOutAccessor->markEOS(); 00204 } 00205 return EXECRC_EOS; 00206 } 00207 } 00208 00209 void JavaTransformExecStream::checkEmptyInputs() 00210 { 00211 for (uint i = 0; i < inAccessors.size(); ++i) { 00212 SharedExecStreamBufAccessor inAccessor = inAccessors[i]; 00213 if (inAccessor->getState() == EXECBUF_EMPTY) { 00214 inAccessor->requestProduction(); 00215 } 00216 } 00217 } 00218 00219 void JavaTransformExecStream::closeImpl() 00220 { 00221 JniEnvAutoRef pEnv; 00222 if (farragoTransform) { 00223 pEnv->DeleteGlobalRef(farragoTransform); 00224 farragoTransform = NULL; 00225 } 00226 if (outputByteBuffer1) { 00227 pEnv->DeleteGlobalRef(outputByteBuffer1); 00228 outputByteBuffer1 = NULL; 00229 } 00230 if (outputByteBuffer2) { 00231 pEnv->DeleteGlobalRef(outputByteBuffer2); 00232 outputByteBuffer2 = NULL; 00233 } 00234 pBuffer1 = NULL; 00235 pBuffer2 = NULL; 00236 ExecStream::closeImpl(); 00237 } 00238 00239 ExecStreamBufProvision JavaTransformExecStream::getInputBufProvision() const 00240 { 00241 return BUFPROV_PRODUCER; 00242 } 00243 00244 ExecStreamBufProvision JavaTransformExecStream::getOutputBufProvision() const 00245 { 00246 return BUFPROV_CONSUMER; 00247 } 00248 00249 ExecStreamBufProvision JavaTransformExecStream::getOutputBufConversion() const 00250 { 00251
00252
00253
00254
00255
00256 return BUFPROV_PRODUCER; 00257 } 00258 00259 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaTransformExecStream.cpp#10 $"); 00260 00261