赞
踩
首先是RtmpOutput输出对象的创建和编码器绑定
- RtmpOutput::RtmpOutput() {
- CreateOutputWithUniqueName();
- }
-
- RtmpOutput::~RtmpOutput()
- {
- obs_encoder_release(videoEncoder_);
- obs_encoder_release(delayVideoEncoder_);
- }
-
- void RtmpOutput::CreateOutputWithUniqueName() {
- uid_++;
- auto uid = uid_.load();
- std::string rtmpNameFormat = "rtmp_stream_%d";
- std::string rtmpOutputName = string_format(rtmpNameFormat, uid);
- obs_data_t* rtmpSettings = obs_data_create();
- SetDefaultRtmpSettings(rtmpSettings);
- auto rtmpSettingsGuard = gsl::finally([&]() { obs_data_release(rtmpSettings); });
- output_ = obs_output_create("rtmp_output", rtmpOutputName.c_str(),
- nullptr, nullptr);
- UpdateOutputSettings(rtmpSettings);
- auto rtmpOutputGuard = gsl::finally([&]() { obs_output_release(output_); });
-
- std::string rtmpServiceNameFormat = "rtmp_service_%d";
- std::string rtmpServiceName = string_format(rtmpServiceNameFormat, uid);
-
- service_ = obs_service_create("rtmp_custom", rtmpServiceName.c_str(), nullptr,
- nullptr);
- auto serviceGuard = gsl::finally([&]() { obs_service_release(service_); });
-
- obs_output_set_service(output_, service_);
- }

调用obs_output_create()函数,根据输出id创建推流对象,与创建编码对象类似,推流对象在加载模块时已添加到obs->output_types中,获取到的推流输出对象赋值给streamOutput指针调用obs_output_set_video_encoder()函数,将推流输出video_encoder设置为编码器;
- bool obs_output_start(obs_output_t *output)
- {
- bool encoded;
- if (!obs_output_valid(output, "obs_output_start"))
- return false;
- if (!output->context.data)
- return false;
- output->reconnect_total_retries = 0;
- encoded = (output->info.flags & OBS_OUTPUT_ENCODED) != 0;
-
- if (encoded && output->delay_sec) {
- return obs_output_delay_start(output);
- } else {
- if (obs_output_actual_start(output)) {
- do_output_signal(output, "starting");
- return true;
- }
-
- return false;
- }
- }

调用obs_output_start() -> obs_output_actual_start() 回调推流对象output->info.start()回调函数开启推流,其中start绑定至rtmp_stream_start()
- bool obs_output_actual_start(obs_output_t *output)
- {
- bool success = false;
-
- os_event_wait(output->connecting_event);
- if (delay_active(output) && delay_capturing(output)) {
- os_event_signal(output->connecting_event);
- return true;
- }
-
- os_event_wait(output->stopping_event);
- output->stop_code = 0;
- if (output->last_error_message) {
- bfree(output->last_error_message);
- output->last_error_message = NULL;
- }
- blog(LOG_INFO, "output %s actual start", obs_output_get_name(output));
- if (output->context.data) {
- success = output->info.start(output->context.data);
- if (!success) {
- if (output->mixer) {
- obs_stop_core_audio_mix(output->mixer);
- output->mixer = NULL;
- }
- if (output->video_output) {
- obs_stop_video_output(output->video_output);
- output->video_output = NULL;
- }
-
- os_event_signal(output->connecting_event);
- blog(LOG_INFO, "output %s actual start return false", obs_output_get_name(output));
- }
- }
-
- if (success && output->video) {
- output->starting_frame_count =
- video_output_get_total_frames(output->video);
- output->starting_drawn_count = obs->video.total_frames;
- output->starting_lagged_count = obs->video.lagged_frames;
- }
-
- if (os_atomic_load_long(&output->delay_restart_refs))
- os_atomic_dec_long(&output->delay_restart_refs);
-
- output->caption_timestamp = 0;
- return success;
- }

static bool rtmp_stream_start(void *data),创建线程,执行connect_thread()函数,static void *connect_thread(void *data)
init_connect()初始化推流,调用free_packets清空stream->packets,获取推流设置,赋值到stream,try_connect()连接rtmp服务器;
- static void *connect_thread(void *data)
- {
- struct rtmp_stream *stream = data;
- int ret;
-
- os_set_thread_name("rtmp-stream: connect_thread");
-
- if (!init_connect(stream)) {
- obs_output_signal_stop(stream->output, OBS_OUTPUT_BAD_PATH);
- return NULL;
- }
-
- ret = try_connect(stream);
-
- if (ret != OBS_OUTPUT_SUCCESS) {
- obs_output_signal_stop(stream->output, ret);
- info("Connection to %s failed: %d", stream->path.array, ret);
- }
-
- if (!stopping(stream))
- pthread_detach(stream->connect_thread);
-
- os_atomic_set_bool(&stream->connecting, false);
- return NULL;
- }

RTMP_Init(&stream->rtmp)初始化rtmp客户端,设置推流服务器地址、用户名、密码、流地址、音频编码名称(为何没有添加视频编码名称);
RTMP_Connect()连接rtmp服务器;
RTMP_ConnectStream()连接rtmp流地址;
init_send()启动发送函数,reset_semaphore()重置发送信号量,创建推流执行线程send_thread,发送视频关键数send_meta_data();
然后开启推流数据捕获obs_output_begin_data_capture();
send_thread()线程函数:循环等待信号量stream->send_sem 被唤醒,唤醒后 get_next_packet()取出队列中的第一个已编码数据包,执行 send_packet 函数,调用flv_packet_mux进行flv数据封包,再调用RTMP_Write()发送数据包,完成视频数据推流.
- static void *send_thread(void *data)
- {
- struct rtmp_stream *stream = data;
-
- os_set_thread_name("rtmp-stream: send_thread");
-
- while (os_sem_wait(stream->send_sem) == 0) {
- struct encoder_packet packet;
-
- if (stopping(stream) && stream->stop_ts == 0) {
- break;
- }
-
- if (!get_next_packet(stream, &packet))
- continue;
-
- if (stopping(stream)) {
- if (can_shutdown_stream(stream, &packet)) {
- obs_encoder_packet_release(&packet);
- break;
- }
- }
-
- if (!stream->sent_headers) {
- if (!send_headers(stream)) {
- os_atomic_set_bool(&stream->disconnected, true);
- break;
- }
- }
-
- if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
- os_atomic_set_bool(&stream->disconnected, true);
- break;
- }
- }
-
- if (disconnected(stream)) {
- info("Disconnected from %s", stream->path.array);
- } else {
- info("User stopped the stream");
- }
-
- if (stream->new_socket_loop) {
- os_event_signal(stream->send_thread_signaled_exit);
- os_event_signal(stream->buffer_has_data_event);
- pthread_join(stream->socket_thread, NULL);
- stream->socket_thread_active = false;
- stream->rtmp.m_bCustomSend = false;
- }
-
- set_output_error(stream);
- RTMP_Close(&stream->rtmp);
-
- if (!stopping(stream)) {
- pthread_detach(stream->send_thread);
- obs_output_signal_stop(stream->output, stream->sent_headers? OBS_OUTPUT_DISCONNECTED: OBS_OUTPUT_CONNECT_FAILED);
- } else {
- obs_output_end_data_capture(stream->output);
- }
-
- free_packets(stream);
- os_event_reset(stream->stop_event);
- os_atomic_set_bool(&stream->active, false);
- stream->sent_headers = false;
- return NULL;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。