Fennel: /home/pub/open/dev/fennel/farrago/JavaSinkExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/farrago/JavaSinkExecStream.h" 00026 #include "fennel/farrago/JniUtil.h" 00027 #include "fennel/exec/ExecStreamGraph.h" 00028 #include "fennel/exec/ExecStreamScheduler.h" 00029 #include "fennel/exec/ExecStreamBufAccessor.h" 00030 #include 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $"); 00033 00034 JavaSinkExecStream::JavaSinkExecStream() 00035 { 00036 lastResult = EXECRC_QUANTUM_EXPIRED; 00037 pStreamGraphHandle = NULL; 00038 javaFennelPipeTupleIterId = 0; 00039 javaFennelPipeTupleIter = NULL; 00040 } 00041 00042 void JavaSinkExecStream::prepare(JavaSinkExecStreamParams const &params) 00043 { 00044 SingleInputExecStream::prepare(params); 00045 pStreamGraphHandle = params.pStreamGraphHandle; 00046 javaFennelPipeTupleIterId = params.javaFennelPipeTupleIterId; 00047 00048 JniEnvAutoRef pEnv; 00049 jclass classFennelPipeTupleIter = pEnv->FindClass( 00050 "net/sf/farrago/runtime/FennelPipeTupleIter"); 00051 assert(classFennelPipeTupleIter); 00052 methFennelPipeTupleIter_write = pEnv->GetMethodID( 00053 classFennelPipeTupleIter, "write", "(Ljava/nio/ByteBuffer;I)V"); 00054 assert(methFennelPipeTupleIter_write); 00055 methFennelPipeTupleIter_getByteBuffer = pEnv->GetMethodID( 00056 classFennelPipeTupleIter, "getByteBuffer", "(I)Ljava/nio/ByteBuffer;"); 00057 assert(methFennelPipeTupleIter_getByteBuffer); 00058 00059 jclass classByteBuffer = pEnv->FindClass("java/nio/ByteBuffer"); 00060 assert(classByteBuffer); 00061 methByteBuffer_array = 00062 pEnv->GetMethodID(classByteBuffer, "array", "()[B"); 00063 assert(methByteBuffer_array); 00064 } 00065 00066 void JavaSinkExecStream::open(bool restart) 00067 { 00068 FENNEL_TRACE(TRACE_FINE, "open"); 00069 SingleInputExecStream::open(restart); 00070 00071
00072 JniEnvAutoRef pEnv; 00073 jlong hJavaFennelPipeTupleIter = pEnv->CallLongMethod( 00074 pStreamGraphHandle->javaRuntimeContext, 00075 JniUtil::methGetJavaStreamHandle, 00076 javaFennelPipeTupleIterId); 00077 javaFennelPipeTupleIter = 00078 CmdInterpreter::getObjectFromLong(hJavaFennelPipeTupleIter); 00079 assert(javaFennelPipeTupleIter); 00080 } 00081 00082 ExecStreamResult JavaSinkExecStream::execute(ExecStreamQuantum const &) 00083 { 00084 ExecStreamBufAccessor &inAccessor = *pInAccessor; 00085 switch (inAccessor.getState()) { 00086 case EXECBUF_EMPTY: 00087
00088
00089
00090 FENNEL_TRACE(TRACE_FINE, "no input"); 00091 return (lastResult = EXECRC_BUF_UNDERFLOW); 00092 case EXECBUF_EOS: 00093
00094
00095
00096 FENNEL_TRACE(TRACE_FINE, "input EOS"); 00097 assert(inAccessor.getConsumptionAvailable() == 0); 00098 break; 00099 default: 00100 FENNEL_TRACE(TRACE_FINER, "input rows:"); 00101 getGraph().getScheduler()-> 00102 traceStreamBufferContents(this, inAccessor, TRACE_FINER); 00103 break; 00104 } 00105 00106 PConstBuffer pInBufStart = inAccessor.getConsumptionStart(); 00107 PConstBuffer pInBufEnd = inAccessor.getConsumptionEnd(); 00108 uint nbytes = pInBufEnd - pInBufStart; 00109 sendData(pInBufStart, nbytes); 00110 if (nbytes > 0) { 00111 inAccessor.consumeData(pInBufEnd); 00112 return (lastResult = EXECRC_BUF_UNDERFLOW); 00113 } else { 00114 return (lastResult = EXECRC_EOS); 00115 } 00116 } 00117 00119 void JavaSinkExecStream::sendData(PConstBuffer src, uint size) 00120 { 00121 JniEnvAutoRef pEnv; 00122 00123
00124
00125
00126 jobject javaByteBuf = pEnv->CallObjectMethod( 00127 javaFennelPipeTupleIter, methFennelPipeTupleIter_getByteBuffer, size); 00128 assert(javaByteBuf); 00129 00130
00131 stuffByteBuffer(javaByteBuf, src, size); 00132 00133
00134
00135 FENNEL_TRACE( 00136 TRACE_FINE, 00137 "call FennelPipeTupleIter.write " << size << " bytes"); 00138 pEnv->CallVoidMethod( 00139 javaFennelPipeTupleIter, methFennelPipeTupleIter_write, 00140 javaByteBuf, size); 00141 FENNEL_TRACE(TRACE_FINE, "FennelPipeTupleIter.write returned"); 00142 } 00143 00144 void JavaSinkExecStream::stuffByteBuffer( 00145 jobject byteBuffer, 00146 PConstBuffer src, 00147 uint size) 00148 { 00149
00150
00151 JniEnvAutoRef pEnv; 00152 00153
00154 jbyteArray bufBacking = 00155 static_cast( 00156 pEnv->CallObjectMethod(byteBuffer, methByteBuffer_array)); 00157 jboolean copied; 00158 jbyte
dst = pEnv->GetByteArrayElements(bufBacking, &copied); 00159 00160
00161 memcpy(dst, src, size); 00162 00163
00164 if (isTracingLevel(TRACE_FINER)) { 00165
00166 ExecStreamBufAccessor ba; 00167 ba.setProvision(BUFPROV_PRODUCER); 00168 ba.setTupleShape( 00169 pInAccessor->getTupleDesc(), pInAccessor->getTupleFormat()); 00170 ba.clear(); 00171 PBuffer buf = (PBuffer) dst; 00172 ba.provideBufferForConsumption(buf, buf + size); 00173 FENNEL_TRACE(TRACE_FINER, "output rows:"); 00174 getGraph().getScheduler()-> 00175 traceStreamBufferContents(*this, ba, TRACE_FINER); 00176 } 00177 00178
00179 pEnv->ReleaseByteArrayElements(bufBacking, dst, 0); 00180 } 00181 00182 00183 void JavaSinkExecStream::closeImpl() 00184 { 00185 FENNEL_TRACE(TRACE_FINE, "closing"); 00186 00187
00188 if (javaFennelPipeTupleIter && (lastResult != EXECRC_EOS)) { 00189 FixedBuffer dummy[1]; 00190 sendData(dummy, 0); 00191 } 00192 00193 javaFennelPipeTupleIter = NULL; 00194 SingleInputExecStream::closeImpl(); 00195 } 00196 00197 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/JavaSinkExecStream.cpp#15 $"); 00198 00199