BmfNode
Detailed Description
BMFNode class.
Function Documentation
__init__()
def bmf.builder.bmf_node.BmfNode.__init__ ( self,
module_info,
option,
upstream_streams,
input_manager = 'default',
pre_module = None,
scheduler = 0
)
pre_module=None, scheduler=0):
if isinstance(module_info, dict):
self.module_info_ = module_info
else:
self.module_info_ = {"name": module_info}
self.option_ = option
self.scheduler_ = scheduler
# the input stream of current node and corresponding
# output stream of upstream node point to same stream instance
self.input_streams_ = {}
if upstream_streams is not None:
self.graph_ = self.init_input_streams(upstream_streams)
else:
from .bmf_graph import BmfGraph
self.graph_ = BmfGraph(option)
print('DEBUG GRAPH created new graph for empty inputstream node')
# input manager
self.input_manager_ = input_manager
# pre_allocated module if exists
self.pre_module = pre_module
self.user_callbacks = {}
# generate node_id and add node to graph
assert (self.graph_ is not None), "graph is none when create node"
self.id_ = self.graph_.generate_node_id()
self.graph_.add_node(self)
# output stream is empty now, created by calling stream() or []
# TODO: do we need keep all streams or only need keep all edges?
self.output_streams_ = {}
self.output_stream_idx = 0
self.output_stream_idx_mutex_ = threading.Lock()
# downstream edges
# while the output stream is actually connected a node, it will
# create an edge and add to outgoing_edges_
self.outgoing_edges_ = []
Member Functions
def init (self, module_info, option, upstream_streams, input_manager=‘default’, pre_module =None, scheduler=0)
def init_input_stream_and_edge (self, upstream_stream, notify)
def init_input_streams (self, upstream_streams)
def generate_stream_name (self)
def stream (self, notify=None, stream_alias=None)
def getitem (self, item)
def add_outgoing_edge (self, edge)
def get_outgoing_edges (self)
def get_input_streams (self)
def get_output_streams (self)
def get_module_info (self)
def get_id (self)
def get_scheduler (self)
def set_scheduler (self, schediler)
def get_option (self)
def get_pre_module (self)
def get_graph (self)
def get_input_manager (self)
def set_input_manager (self, input_manager)
def run (self)
def add_user_callback (self, key, cb)
def get_user_callback (self)
def start (self)
def create_sync_module (self)
Member Datas
Member Function Documentation
getitem()
def bmf.builder.bmf_node.BmfNode.__getitem__ ( self,
item
)
def __getitem__(self, item):
return self.stream(notify=item)
add_outgoing_edge()
def bmf.builder.bmf_node.BmfNode.add_outgoing_edge ( self,
edge
)
def add_outgoing_edge(self, edge):
if edge is not None:
self.outgoing_edges_.append(edge)
add_user_callback()
def bmf.builder.bmf_node.BmfNode.add_user_callback ( self,
key,
cb
)
def add_user_callback(self, key, cb):
from bmf.lib._bmf import engine
callback = engine.Callback(cb)
self.user_callbacks[key] = (callback.uid(), callback)
Example:
bmf.encode(
video['video'], video['audio'], {
"output_path": output_path,
"video_params": {
"codec": "h264",
"width": 320,
"height": 240,
"crf": "23",
"preset": "veryfast"
}
}).node_.add_user_callback(bmf.BmfCallBackType.LATEST_TIMESTAMP,
cb)
If you need the complete code, you can refer to test_transcode.py
create_sync_module()
def bmf.builder.bmf_node.BmfNode.create_sync_module ( self )
def create_sync_module(self):
from bmf.lib._bmf import engine
node_option = json.dumps(self.option_)
# convert node option for filter
if self.module_info_["name"] == "c_ffmpeg_filter":
node_config = self.get_graph().generate_node_config(self)
node_option = engine.convert_filter_para(node_config.dump())
# create module
mod = engine.Module(self.module_info_["name"], node_option,
self.module_info_["type"],
self.module_info_["path"], self.module_info_["entry"])
# input stream list
input_stream_id = []
if self.module_info_["name"] == "c_ffmpeg_encoder" and 1 in self.get_input_streams().keys():
input_stream_id.append(0)
input_stream_id.append(1)
else:
for id in self.get_input_streams().keys():
input_stream_id.append(id)
# output stream list
output_stream_id = []
if self.module_info_["name"] == "c_ffmpeg_decoder":
for key in self.get_output_streams().keys():
if key == "video":
output_stream_id.append(0)
elif key == "audio":
output_stream_id.append(1)
else:
for id in self.get_output_streams().keys():
output_stream_id.append(id)
# create sync module
sync_module = SyncModule(mod, input_stream_id, output_stream_id)
return sync_module
generate_stream_name()
def bmf.builder.bmf_node.BmfNode.generate_stream_name ( self )
def generate_stream_name(self):
# stream name format: $(node_type)_$(node_id)_$(stream_index)
self.output_stream_idx_mutex_.acquire()
stream_name = self.module_info_["name"] + '_' + str(self.id_) + '_' + str(self.output_stream_idx)
self.output_stream_idx += 1
self.output_stream_idx_mutex_.release()
return stream_name
get_graph()
def bmf.builder.bmf_node.BmfNode.get_graph ( self )
def get_graph(self):
return self.graph_
get_id()
def bmf.builder.bmf_node.BmfNode.get_id ( self )
def get_id(self):
return self.id_
get_input_manager()
def bmf.builder.bmf_node.BmfNode.get_input_manager ( self )
def get_input_manager(self):
return self.input_manager_
get_input_streams()
def bmf.builder.bmf_node.BmfNode.get_input_streams ( self )
def get_input_streams(self):
return self.input_streams_
get_module_info()
def bmf.builder.bmf_node.BmfNode.get_module_info ( self )
def get_module_info(self):
return self.module_info_
get_option()
def bmf.builder.bmf_node.BmfNode.get_option ( self )
def get_option(self):
return self.option_
get_outgoing_edges()
def bmf.builder.bmf_node.BmfNode.get_outgoing_edges ( self )
def get_outgoing_edges(self):
return self.outgoing_edges_
get_output_streams()
def bmf.builder.bmf_node.BmfNode.get_output_streams ( self )
def get_output_streams(self):
return self.output_streams_
get_pre_module()
def bmf.builder.bmf_node.BmfNode.get_pre_module ( self )
def get_pre_module(self):
return self.pre_module
get_scheduler()
def bmf.builder.bmf_node.BmfNode.get_scheduler ( self )
def get_scheduler(self):
return self.scheduler_
get_user_callback()
def bmf.builder.bmf_node.BmfNode.get_user_callback ( self )
def get_user_callback(self):
return self.user_callbacks
init_input_stream_and_edge()
def bmf.builder.bmf_node.BmfNode.init_input_stream_and_edge ( self,
upstream_stream,
notify
)
def init_input_stream_and_edge(self, upstream_stream, notify):
graph = None
if upstream_stream is not None:
# create input stream
input_stream = BmfStream(upstream_stream.get_name(), self, notify, stream_alias=upstream_stream.get_alias())
self.input_streams_[notify] = input_stream
# get graph
graph = upstream_stream.get_graph()
# create edge
edge = BmfEdge(upstream_stream, input_stream)
# add edge to upstream node
if upstream_stream.get_node() is not None:
upstream_stream.get_node().add_outgoing_edge(edge)
return graph
init_input_streams()
def bmf.builder.bmf_node.BmfNode.init_input_streams ( self,
upstream_streams
)
def init_input_streams(self, upstream_streams):
graph = None
from .bmf_graph import BmfGraph
if upstream_streams is None:
return
elif isinstance(upstream_streams, BmfGraph):
# for source node, there is no input streams
# use graph to initialize node
return upstream_streams
elif isinstance(upstream_streams, BmfStream):
# if there is only one upstream stream, notify is 0
graph = self.init_input_stream_and_edge(upstream_streams, 0)
elif isinstance(upstream_streams, (list, tuple)):
for index, upstream_stream in enumerate(upstream_streams):
# for list input, index is notify
graph = self.init_input_stream_and_edge(upstream_stream, index)
elif isinstance(upstream_streams, dict):
for (notify, upstream_stream) in upstream_streams.items():
graph = self.init_input_stream_and_edge(upstream_stream, notify)
return graph
run()
def bmf.builder.bmf_node.BmfNode.run ( self )
def run(self):
self.graph_.run()
set_input_manager()
def bmf.builder.bmf_node.BmfNode.set_input_manager ( self,
input_manager
)
def set_input_manager(self, input_manager):
self.input_manager_ = input_manager
set_scheduler()
def bmf.builder.bmf_node.BmfNode.set_scheduler ( self,
schediler
)
def set_scheduler(self, schediler):
self.scheduler_ = schediler
start()
def bmf.builder.bmf_node.BmfNode.start ( self )
def start(self):
print('no output stream')
stream()
def bmf.builder.bmf_node.BmfNode.stream ( self,
notify = None,
stream_alias = None
)
def stream(self, notify=None, stream_alias=None):
if notify is None:
notify = 0
if notify not in self.output_streams_.keys():
stream_name = self.generate_stream_name()
# create output stream
s = BmfStream(stream_name, self, notify, stream_alias=stream_alias)
self.output_streams_[notify] = s
return self.output_streams_[notify]
Member Data Documentation
graph_
bmf.builder.bmf_node.BmfNode.graph_
id_
bmf.builder.bmf_node.BmfNode.id_
input_manager_
bmf.builder.bmf_node.BmfNode.input_manager_
input_streams_
bmf.builder.bmf_node.BmfNode.input_streams_
module_info_
bmf.builder.bmf_node.BmfNode.module_info_
option_
bmf.builder.bmf_node.BmfNode.option_
outgoing_edges_
bmf.builder.bmf_node.BmfNode.outgoing_edges_
output_stream_idx
bmf.builder.bmf_node.BmfNode.output_stream_idx
output_stream_idx_mutex_
bmf.builder.bmf_node.BmfNode.output_stream_idx_mutex_
output_streams_
bmf.builder.bmf_node.BmfNode.output_streams_
pre_module
bmf.builder.bmf_node.BmfNode.pre_module
scheduler_
bmf.builder.bmf_node.BmfNode.scheduler_
user_callbacks
bmf.builder.bmf_node.BmfNode.user_callbacks
- /20230627/doxygen_converter/bmf/bmf/builder/ bmf_node.py