Fennel: /home/pub/open/dev/fennel/exec/ExecStreamGraphImpl.h Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #ifndef Fennel_ExecStreamGraphImpl_Included 00025 #define Fennel_ExecStreamGraphImpl_Included 00026 00027 #include "fennel/exec/ExecStreamGraph.h" 00028 00029 #include 00030 #include <boost/property_map.hpp> 00031 #include <boost/graph/adjacency_list.hpp> 00032 #include <boost/graph/properties.hpp> 00033 #include <boost/graph/filtered_graph.hpp> 00034 00035 00036 namespace boost 00037 { 00038 enum vertex_data_t { vertex_data }; 00039 enum edge_data_t { edge_data }; 00040 BOOST_INSTALL_PROPERTY(vertex,data); 00041 BOOST_INSTALL_PROPERTY(edge,data); 00042 } 00043 00044 FENNEL_BEGIN_NAMESPACE 00045 00050 class FENNEL_EXEC_EXPORT ExecStreamGraphImpl 00051 : virtual public ExecStreamGraph 00052 { 00053 public: 00054 typedef boost::adjacency_list< 00055 boost::vecS, 00056 boost::vecS, 00057 boost::bidirectionalS, 00058 boost::propertyboost::vertex_data_t,SharedExecStream, 00059 boost::property< 00060 boost::edge_data_t,SharedExecStreamBufAccessor, 00061 boost::propertyboost::edge_weight_t,int > > 00062 FullGraphRep; 00063 00064 typedef boost::graph_traits::vertex_descriptor Vertex; 00065 typedef boost::graph_traits::edge_descriptor Edge; 00066 00067 typedef boost::graph_traits::vertex_iterator FgVertexIter; 00068 typedef boost::graph_traits::edge_iterator FgEdgeIter; 00069 typedef boost::graph_traits::out_edge_iterator FgOutEdgeIter; 00070 typedef boost::graph_traits::in_edge_iterator FgInEdgeIter; 00071 typedef std::pair<FgVertexIter,FgVertexIter> FgVertexIterPair; 00072 typedef std::pair<FgEdgeIter,FgEdgeIter> FgEdgeIterPair; 00073 typedef std::pair<FgOutEdgeIter,FgOutEdgeIter> FgOutEdgeIterPair; 00074 typedef std::pair<FgInEdgeIter,FgInEdgeIter> FgInEdgeIterPair; 00075 00076 typedef boost::property_map<FullGraphRep, boost::edge_weight_t>::type 00077 EdgeWeightMap; 00078 00079 struct ExplicitEdgePredicate 00080 { 00081 EdgeWeightMap weightMap; 00082 00083
00084
00085 00086 ExplicitEdgePredicate() 00087 { 00088 } 00089 00090 ExplicitEdgePredicate(EdgeWeightMap weightMapInit) 00091 : weightMap(weightMapInit) 00092 { 00093 } 00094 00095 bool operator () (Edge const &edge) const 00096 { 00097 return boost::get(weightMap, edge) > 0; 00098 } 00099 }; 00100 00101 typedef boost::filtered_graph<FullGraphRep, ExplicitEdgePredicate> 00102 GraphRep; 00103 typedef boost::graph_traits::vertex_iterator VertexIter; 00104 typedef boost::graph_traits::edge_iterator EdgeIter; 00105 typedef boost::graph_traits::out_edge_iterator OutEdgeIter; 00106 typedef boost::graph_traits::in_edge_iterator InEdgeIter; 00107 typedef std::pair<VertexIter,VertexIter> VertexIterPair; 00108 typedef std::pair<EdgeIter,EdgeIter> EdgeIterPair; 00109 typedef std::pair<OutEdgeIter,OutEdgeIter> OutEdgeIterPair; 00110 typedef std::pair<InEdgeIter,InEdgeIter> InEdgeIterPair; 00111 00112 00113 protected: 00114 00115
00116
00117
00118
00119
00120
00121 00122 FullGraphRep graphRep; 00123 00124 GraphRep filteredGraph; 00125 00126 typedef std::mapstd::string,ExecStreamId StreamMap; 00127 typedef StreamMap::const_iterator StreamMapConstIter; 00128 typedef std::map<std::pair<std::string, uint>,ExecStreamId> EdgeMap; 00129 00130
00131 class DotGraphRenderer; 00132 class DotVertexRenderer; 00133 class DotEdgeRenderer; 00134 00138 std::vector freeVertices; 00139 00143 StreamMap streamMap; 00144 00148 EdgeMap streamOutMap; 00149 00153 std::vector sortedStreams; 00154 00158 SharedLogicalTxn pTxn; 00159 00163 SharedErrorTarget pErrorTarget; 00164 00168 SharedSegment pScratchSegment; 00169 00173 SharedExecStreamGovernor pResourceGovernor; 00174 00181 bool isOpen; 00182 00186 bool isPrepared; 00187 00191 bool doDataflowClose; 00192 00193 class DynamicParamInfo 00194 { 00195 public: 00196 std::vector readerStreamIds; 00197 std::vector writerStreamIds; 00198 }; 00199 00203 std::map<DynamicParamId, DynamicParamInfo> dynamicParamMap; 00204 00208 bool allowDummyTxnId; 00209 00210 virtual void closeImpl(); 00211 virtual void sortStreams(); 00212 virtual void openStream(SharedExecStream pStream); 00213 virtual void bindStreamBufAccessors(SharedExecStream pStream); 00214 virtual void mergeFrom(ExecStreamGraphImpl& src); 00215 virtual void mergeFrom( 00216 ExecStreamGraphImpl& src, 00217 std::vector const& nodes); 00218 00221 virtual void clear(); 00223 virtual Vertex addVertex(SharedExecStream pStream); 00224 00225
00227 Vertex newVertex(); 00229 void freeVertex(Vertex); 00230 00232 void removeFromStreamOutMap(SharedExecStream); 00233 00234 virtual Edge getInputEdge(ExecStreamId stream, uint iInput); 00235 virtual Edge getOutputEdge(ExecStreamId stream, uint iOutput); 00236 00237 public: 00238 explicit ExecStreamGraphImpl(); 00239 virtual ~ExecStreamGraphImpl() {} 00240 00241 inline GraphRep const &getGraphRep(); 00242 inline FullGraphRep const &getFullGraphRep(); 00243 inline SharedExecStream getStreamFromVertex(Vertex); 00244 inline SharedExecStreamBufAccessor &getSharedBufAccessorFromEdge(Edge); 00245 inline ExecStreamBufAccessor &getBufAccessorFromEdge(Edge); 00246 00247
00248 virtual void setTxn(SharedLogicalTxn pTxn); 00249 virtual void setErrorTarget(SharedErrorTarget pErrorTarget); 00250 virtual void setScratchSegment( 00251 SharedSegment pScratchSegment); 00252 virtual void setResourceGovernor( 00253 SharedExecStreamGovernor pResourceGovernor); 00254 virtual SharedLogicalTxn getTxn(); 00255 virtual TxnId getTxnId(); 00256 virtual void enableDummyTxnId(bool enabled); 00257 virtual SharedExecStreamGovernor getResourceGovernor(); 00258 virtual void prepare(ExecStreamScheduler &scheduler); 00259 virtual void open(); 00260 virtual void addStream(SharedExecStream pStream); 00261 virtual void removeStream(ExecStreamId); 00262 virtual void addDataflow( 00263 ExecStreamId producerId, 00264 ExecStreamId consumerId, 00265 bool isImplicit = false); 00266 virtual void addOutputDataflow( 00267 ExecStreamId producerId); 00268 virtual void addInputDataflow( 00269 ExecStreamId consumerId); 00270 virtual void mergeFrom(ExecStreamGraph& src); 00271 virtual void mergeFrom( 00272 ExecStreamGraph& src, 00273 std::vector const& nodes); 00274 virtual SharedExecStream findStream( 00275 std::string name); 00276 virtual SharedExecStream findLastStream( 00277 std::string name, 00278 uint iOutput); 00279 virtual void interposeStream( 00280 std::string name, 00281 uint iOutput, 00282 ExecStreamId interposedId); 00283 virtual SharedExecStream getStream(ExecStreamId id); 00284 virtual uint getInputCount( 00285 ExecStreamId streamId); 00286 virtual uint getOutputCount( 00287 ExecStreamId streamId); 00288 virtual SharedExecStream getStreamInput( 00289 ExecStreamId streamId, 00290 uint iInput); 00291 virtual SharedExecStreamBufAccessor getStreamInputAccessor( 00292 ExecStreamId streamId, 00293 uint iInput); 00294 virtual SharedExecStream getStreamOutput( 00295 ExecStreamId streamId, 00296 uint iOutput); 00297 virtual SharedExecStreamBufAccessor getStreamOutputAccessor( 00298 ExecStreamId streamId, 00299 uint iOutput); 00300 virtual std::vector getSortedStreams(); 00301 virtual int getStreamCount(); 00302 virtual int getDataflowCount(); 00303 virtual void renderGraphviz(std::ostream &dotStream); 00304 virtual bool isAcyclic(); 00305 virtual void closeProducers(ExecStreamId streamId); 00306 virtual void declareDynamicParamWriter( 00307 ExecStreamId streamId, 00308 DynamicParamId dynamicParamId); 00309 virtual void declareDynamicParamReader( 00310 ExecStreamId streamId, 00311 DynamicParamId dynamicParamId); 00312 virtual const std::vector &getDynamicParamWriters( 00313 DynamicParamId dynamicParamId); 00314 virtual const std::vector &getDynamicParamReaders( 00315 DynamicParamId dynamicParamId); 00316 }; 00317 00318 inline ExecStreamGraphImpl::GraphRep const & 00319 ExecStreamGraphImpl::getGraphRep() 00320 { 00321 return filteredGraph; 00322 } 00323 00324 inline ExecStreamGraphImpl::FullGraphRep const & 00325 ExecStreamGraphImpl::getFullGraphRep() 00326 { 00327 return graphRep; 00328 } 00329 00330 inline SharedExecStream ExecStreamGraphImpl::getStreamFromVertex( 00331 Vertex vertex) 00332 { 00333 return boost::get(boost::vertex_data,graphRep)[vertex]; 00334 } 00335 00336 inline SharedExecStreamBufAccessor & 00337 ExecStreamGraphImpl::getSharedBufAccessorFromEdge( 00338 Edge edge) 00339 { 00340 return boost::get(boost::edge_data,graphRep)[edge]; 00341 } 00342 00343 inline ExecStreamBufAccessor &ExecStreamGraphImpl::getBufAccessorFromEdge( 00344 Edge edge) 00345 { 00346 return *(getSharedBufAccessorFromEdge(edge)); 00347 } 00348 00349 FENNEL_END_NAMESPACE 00350 00351 #endif 00352 00353