codec thread count depending on cpu condition

Signed-off-by: 21pages <pages21@163.com>
This commit is contained in:
21pages
2023-07-20 21:16:38 +08:00
parent 2133f91089
commit 31b3c5d721
15 changed files with 281 additions and 91 deletions

View File

@@ -46,7 +46,6 @@ pub mod keyboard;
pub use dlopen;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
pub use machine_uid;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
pub use sysinfo;
pub use toml;
pub use uuid;

View File

@@ -4,6 +4,9 @@ pub mod linux;
#[cfg(target_os = "macos")]
pub mod macos;
#[cfg(target_os = "windows")]
pub mod windows;
#[cfg(not(debug_assertions))]
use crate::{config::Config, log};
#[cfg(not(debug_assertions))]

View File

@@ -0,0 +1,149 @@
use std::{
collections::VecDeque,
os::windows::raw::HANDLE,
sync::{Arc, Mutex},
time::Instant,
};
use winapi::{
shared::minwindef::{DWORD, FALSE},
um::{
handleapi::CloseHandle,
pdh::{
PdhAddCounterA, PdhCloseQuery, PdhCollectQueryData, PdhCollectQueryDataEx,
PdhGetFormattedCounterValue, PdhOpenQueryA, PDH_FMT_COUNTERVALUE, PDH_FMT_DOUBLE,
PDH_HCOUNTER, PDH_HQUERY,
},
synchapi::{CreateEventA, WaitForSingleObject},
winbase::{INFINITE, WAIT_OBJECT_0},
},
};
lazy_static::lazy_static! {
static ref CPU_USAGE_ONE_MINUTE: Arc<Mutex<Option<(f64, Instant)>>> = Arc::new(Mutex::new(None));
}
// https://github.com/mgostIH/process_list/blob/master/src/windows/mod.rs
#[repr(transparent)]
pub struct RAIIHandle(pub HANDLE);
impl Drop for RAIIHandle {
fn drop(&mut self) {
// This never gives problem except when running under a debugger.
unsafe { CloseHandle(self.0) };
}
}
#[repr(transparent)]
pub(self) struct RAIIPDHQuery(pub PDH_HQUERY);
impl Drop for RAIIPDHQuery {
fn drop(&mut self) {
unsafe { PdhCloseQuery(self.0) };
}
}
pub unsafe fn start_cpu_performance_monitor() {
// Code from:
// https://learn.microsoft.com/en-us/windows/win32/perfctrs/collecting-performance-data
// https://learn.microsoft.com/en-us/windows/win32/api/pdh/nf-pdh-pdhcollectquerydataex
// Why value lower than taskManager:
// https://aaron-margosis.medium.com/task-managers-cpu-numbers-are-all-but-meaningless-2d165b421e43
// Therefore we should compare with Precess Explorer rather than taskManager
std::thread::spawn(|| {
// load avg or cpu usage, test with prime95.
// Prefer cpu usage because we can get accurate value from Precess Explorer.
// const COUNTER_PATH: &'static str = "\\System\\Processor Queue Length\0";
const COUNTER_PATH: &'static str = "\\Processor(_total)\\% Processor Time\0";
const SAMPLE_INTERVAL: DWORD = 2; // 2 second
let mut ret;
let mut query: PDH_HQUERY = std::mem::zeroed();
ret = PdhOpenQueryA(std::ptr::null() as _, 0, &mut query);
if ret != 0 {
log::error!("PdhOpenQueryA failed: 0x{:X}", ret);
return;
}
let _query = RAIIPDHQuery(query);
let mut counter: PDH_HCOUNTER = std::mem::zeroed();
ret = PdhAddCounterA(query, COUNTER_PATH.as_ptr() as _, 0, &mut counter);
if ret != 0 {
log::error!("PdhAddCounterA failed: 0x{:X}", ret);
return;
}
ret = PdhCollectQueryData(query);
if ret != 0 {
log::error!("PdhCollectQueryData failed: 0x{:X}", ret);
return;
}
let mut _counter_type: DWORD = 0;
let mut counter_value: PDH_FMT_COUNTERVALUE = std::mem::zeroed();
let event = CreateEventA(std::ptr::null_mut(), FALSE, FALSE, std::ptr::null() as _);
if event.is_null() {
log::error!("CreateEventA failed: 0x{:X}", ret);
return;
}
let _event: RAIIHandle = RAIIHandle(event);
ret = PdhCollectQueryDataEx(query, SAMPLE_INTERVAL, event);
if ret != 0 {
log::error!("PdhCollectQueryDataEx failed: 0x{:X}", ret);
return;
}
let mut queue: VecDeque<f64> = VecDeque::new();
let mut recent_valid: VecDeque<bool> = VecDeque::new();
loop {
// latest one minute
if queue.len() == 31 {
queue.pop_front();
}
if recent_valid.len() == 31 {
recent_valid.pop_front();
}
// allow get value within one minute
if queue.len() > 0 && recent_valid.iter().filter(|v| **v).count() > queue.len() / 2 {
let sum: f64 = queue.iter().map(|f| f.to_owned()).sum();
let avg = sum / (queue.len() as f64);
*CPU_USAGE_ONE_MINUTE.lock().unwrap() = Some((avg, Instant::now()));
} else {
*CPU_USAGE_ONE_MINUTE.lock().unwrap() = None;
}
if WAIT_OBJECT_0 != WaitForSingleObject(event, INFINITE) {
recent_valid.push_back(false);
continue;
}
if PdhGetFormattedCounterValue(
counter,
PDH_FMT_DOUBLE,
&mut _counter_type,
&mut counter_value,
) != 0
|| counter_value.CStatus != 0
{
recent_valid.push_back(false);
continue;
}
queue.push_back(counter_value.u.doubleValue().clone());
recent_valid.push_back(true);
}
});
}
pub fn cpu_uage_one_minute() -> Option<f64> {
let v = CPU_USAGE_ONE_MINUTE.lock().unwrap().clone();
if let Some((v, instant)) = v {
if instant.elapsed().as_secs() < 30 {
return Some(v);
}
}
None
}
pub fn sync_cpu_usage(cpu_usage: Option<f64>) {
let v = match cpu_usage {
Some(cpu_usage) => Some((cpu_usage, Instant::now())),
None => None,
};
*CPU_USAGE_ONE_MINUTE.lock().unwrap() = v;
log::info!("cpu usage synced: {:?}", cpu_usage);
}

View File

@@ -1,8 +1,8 @@
use docopt::Docopt;
use hbb_common::env_logger::{init_from_env, Env, DEFAULT_FILTER_ENV};
use scrap::{
aom::{AomDecoder, AomDecoderConfig, AomEncoder, AomEncoderConfig},
codec::{EncoderApi, EncoderCfg, Quality as Q},
aom::{AomDecoder, AomEncoder, AomEncoderConfig},
codec::{codec_thread_num, EncoderApi, EncoderCfg, Quality as Q},
Capturer, Display, TraitCapturer, VpxDecoder, VpxDecoderConfig, VpxEncoder, VpxEncoderConfig,
VpxVideoCodecId::{self, *},
STRIDE_ALIGN,
@@ -117,7 +117,6 @@ fn test_vpx(
timebase: [1, 1000],
quality,
codec: codec_id,
num_threads: (num_cpus::get() / 2) as _,
});
let mut encoder = VpxEncoder::new(config).unwrap();
let mut vpxs = vec![];
@@ -144,11 +143,7 @@ fn test_vpx(
size / yuv_count
);
let mut decoder = VpxDecoder::new(VpxDecoderConfig {
codec: codec_id,
num_threads: (num_cpus::get() / 2) as _,
})
.unwrap();
let mut decoder = VpxDecoder::new(VpxDecoderConfig { codec: codec_id }).unwrap();
let start = Instant::now();
for vpx in vpxs {
let _ = decoder.decode(&vpx);
@@ -186,10 +181,7 @@ fn test_av1(yuvs: &Vec<Vec<u8>>, width: usize, height: usize, quality: Q, yuv_co
start.elapsed() / yuv_count as _,
size / yuv_count
);
let mut decoder = AomDecoder::new(AomDecoderConfig {
num_threads: (num_cpus::get() / 2) as _,
})
.unwrap();
let mut decoder = AomDecoder::new().unwrap();
let start = Instant::now();
for av1 in av1s {
let _ = decoder.decode(&av1);
@@ -237,6 +229,7 @@ mod hw {
gop: 60,
quality: Quality_Default,
rc: RC_DEFAULT,
thread_count: codec_thread_num() as _,
};
let encoders = Encoder::available_encoders(ctx.clone());
@@ -289,6 +282,7 @@ mod hw {
let ctx = DecodeContext {
name: info.name,
device_type: info.hwdevice,
thread_count: codec_thread_num() as _,
};
let mut decoder = Decoder::new(ctx.clone()).unwrap();

View File

@@ -116,7 +116,6 @@ fn main() -> io::Result<()> {
timebase: [1, 1000],
quality,
codec: vpx_codec,
num_threads: 0,
}))
.unwrap();

View File

@@ -6,7 +6,7 @@
include!(concat!(env!("OUT_DIR"), "/aom_ffi.rs"));
use crate::codec::{base_bitrate, Quality};
use crate::codec::{base_bitrate, codec_thread_num, Quality};
use crate::{codec::EncoderApi, EncodeFrame, STRIDE_ALIGN};
use crate::{common::GoogleImage, generate_call_macro, generate_call_ptr_macro, Error, Result};
use hbb_common::{
@@ -68,25 +68,6 @@ mod webrtc {
pub const DEFAULT_Q_MAX: u32 = 56; // no more than 63
pub const DEFAULT_Q_MIN: u32 = 12; // no more than 63, litter than q_max
fn number_of_threads(width: u32, height: u32, number_of_cores: usize) -> u32 {
// Keep the number of encoder threads equal to the possible number of
// column/row tiles, which is (1, 2, 4, 8). See comments below for
// AV1E_SET_TILE_COLUMNS/ROWS.
if width * height >= 640 * 360 && number_of_cores > 4 {
return 4;
} else if width * height >= 320 * 180 && number_of_cores > 2 {
return 2;
} else {
// Use 2 threads for low res on ARM.
#[cfg(any(target_arch = "arm", target_arch = "aarch64", target_os = "android"))]
if width * height >= 320 * 180 && number_of_cores > 2 {
return 2;
}
// 1 thread less than VGA.
return 1;
}
}
// Only positive speeds, range for real-time coding currently is: 6 - 8.
// Lower means slower/better quality, higher means fastest/lower quality.
fn get_cpu_speed(width: u32, height: u32) -> u32 {
@@ -120,7 +101,7 @@ mod webrtc {
// Overwrite default config with input encoder settings & RTC-relevant values.
c.g_w = cfg.width;
c.g_h = cfg.height;
c.g_threads = number_of_threads(cfg.width, cfg.height, num_cpus::get());
c.g_threads = codec_thread_num() as _;
c.g_timebase.num = 1;
c.g_timebase.den = kRtpTicksPerSecond;
c.g_input_bit_depth = kBitDepth;
@@ -415,24 +396,16 @@ impl<'a> Iterator for EncodeFrames<'a> {
}
}
pub struct AomDecoderConfig {
pub num_threads: u32,
}
pub struct AomDecoder {
ctx: aom_codec_ctx_t,
}
impl AomDecoder {
pub fn new(cfg: AomDecoderConfig) -> Result<Self> {
pub fn new() -> Result<Self> {
let i = call_aom_ptr!(aom_codec_av1_dx());
let mut ctx = Default::default();
let cfg = aom_codec_dec_cfg_t {
threads: if cfg.num_threads == 0 {
num_cpus::get() as _
} else {
cfg.num_threads
},
threads: codec_thread_num() as _,
w: 0,
h: 0,
allow_lowbitdepth: 1,

View File

@@ -11,14 +11,12 @@ use crate::mediacodec::{
MediaCodecDecoder, MediaCodecDecoders, H264_DECODER_SUPPORT, H265_DECODER_SUPPORT,
};
use crate::{
aom::{self, AomDecoder, AomDecoderConfig, AomEncoder, AomEncoderConfig},
aom::{self, AomDecoder, AomEncoder, AomEncoderConfig},
common::GoogleImage,
vpxcodec::{self, VpxDecoder, VpxDecoderConfig, VpxEncoder, VpxEncoderConfig, VpxVideoCodecId},
CodecName, ImageRgb,
};
#[cfg(not(any(target_os = "android", target_os = "ios")))]
use hbb_common::sysinfo::{System, SystemExt};
use hbb_common::{
anyhow::anyhow,
config::PeerConfig,
@@ -31,10 +29,15 @@ use hbb_common::{
};
#[cfg(any(feature = "hwcodec", feature = "mediacodec"))]
use hbb_common::{config::Config2, lazy_static};
use hbb_common::{
sysinfo::{System, SystemExt},
tokio::time::Instant,
};
lazy_static::lazy_static! {
static ref PEER_DECODINGS: Arc<Mutex<HashMap<i32, SupportedDecoding>>> = Default::default();
static ref CODEC_NAME: Arc<Mutex<CodecName>> = Arc::new(Mutex::new(CodecName::VP9));
static ref THREAD_LOG_TIME: Arc<Mutex<Option<Instant>>> = Arc::new(Mutex::new(None));
}
#[derive(Debug, Clone)]
@@ -192,7 +195,6 @@ impl Encoder {
#[allow(unused_mut)]
let mut auto_codec = CodecName::VP9;
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if vp8_useable && System::new_all().total_memory() <= 4 * 1024 * 1024 * 1024 {
// 4 Gb
auto_codec = CodecName::VP8
@@ -276,18 +278,13 @@ impl Decoder {
pub fn new() -> Decoder {
let vp8 = VpxDecoder::new(VpxDecoderConfig {
codec: VpxVideoCodecId::VP8,
num_threads: (num_cpus::get() / 2) as _,
})
.unwrap();
let vp9 = VpxDecoder::new(VpxDecoderConfig {
codec: VpxVideoCodecId::VP9,
num_threads: (num_cpus::get() / 2) as _,
})
.unwrap();
let av1 = AomDecoder::new(AomDecoderConfig {
num_threads: (num_cpus::get() / 2) as _,
})
.unwrap();
let av1 = AomDecoder::new().unwrap();
Decoder {
vp8,
vp9,
@@ -503,3 +500,42 @@ pub fn base_bitrate(width: u32, height: u32) -> u32 {
}
base_bitrate
}
pub fn codec_thread_num() -> usize {
let max: usize = num_cpus::get();
let mut res = 0;
let info;
#[cfg(windows)]
{
let percent = hbb_common::platform::windows::cpu_uage_one_minute();
info = format!("cpu usage:{:?}", percent);
if let Some(pecent) = percent {
if pecent < 100.0 {
res = ((100.0 - pecent) * (max as f64) / 200.0).round() as usize;
}
}
}
#[cfg(not(windows))]
{
let s = System::new_all();
// https://man7.org/linux/man-pages/man3/getloadavg.3.html
let avg = s.load_average();
info = format!("cpu loadavg:{}", avg.one);
res = (((max as f64) - avg.one) * 0.5).round() as usize;
}
res = if res > 0 && res <= max / 2 {
res
} else {
std::cmp::max(1, max / 2)
};
// avoid frequent log
let log = match THREAD_LOG_TIME.lock().unwrap().clone() {
Some(instant) => instant.elapsed().as_secs() > 1,
None => true,
};
if log {
log::info!("cpu num: {max}, {info}, codec thread: {res}");
*THREAD_LOG_TIME.lock().unwrap() = Some(Instant::now());
}
res
}

View File

@@ -1,5 +1,5 @@
use crate::{
codec::{base_bitrate, EncoderApi, EncoderCfg},
codec::{base_bitrate, codec_thread_num, EncoderApi, EncoderCfg},
hw, ImageFormat, ImageRgb, HW_STRIDE_ALIGN,
};
use hbb_common::{
@@ -63,6 +63,7 @@ impl EncoderApi for HwEncoder {
gop: DEFAULT_GOP,
quality: DEFAULT_HW_QUALITY,
rc: DEFAULT_RC,
thread_count: codec_thread_num() as _, // ffmpeg's thread_count is used for cpu
};
let format = match Encoder::format_from_name(config.name.clone()) {
Ok(format) => format,
@@ -239,6 +240,7 @@ impl HwDecoder {
let ctx = DecodeContext {
name: info.name.clone(),
device_type: info.hwdevice.clone(),
thread_count: codec_thread_num() as _,
};
match Decoder::new(ctx) {
Ok(decoder) => Ok(HwDecoder { decoder, info }),
@@ -335,6 +337,7 @@ pub fn check_config() {
gop: DEFAULT_GOP,
quality: DEFAULT_HW_QUALITY,
rc: DEFAULT_RC,
thread_count: 4,
};
let encoders = CodecInfo::score(Encoder::available_encoders(ctx));
let decoders = CodecInfo::score(Decoder::available_decoders());

View File

@@ -7,7 +7,7 @@ use hbb_common::log;
use hbb_common::message_proto::{EncodedVideoFrame, EncodedVideoFrames, Message, VideoFrame};
use hbb_common::ResultType;
use crate::codec::{base_bitrate, EncoderApi, Quality};
use crate::codec::{base_bitrate, codec_thread_num, EncoderApi, Quality};
use crate::{GoogleImage, STRIDE_ALIGN};
use super::vpx::{vp8e_enc_control_id::*, vpx_codec_err_t::*, *};
@@ -71,11 +71,7 @@ impl EncoderApi for VpxEncoder {
// When the data buffer falls below this percentage of fullness, a dropped frame is indicated. Set the threshold to zero (0) to disable this feature.
// In dynamic scenes, low bitrate gets low fps while high bitrate gets high fps.
c.rc_dropframe_thresh = 25;
c.g_threads = if config.num_threads == 0 {
num_cpus::get() as _
} else {
config.num_threads
};
c.g_threads = codec_thread_num() as _;
c.g_error_resilient = VPX_ERROR_RESILIENT_DEFAULT;
// https://developers.google.com/media/vp9/bitrate-modes/
// Constant Bitrate mode (CBR) is recommended for live streaming with VP9.
@@ -353,13 +349,11 @@ pub struct VpxEncoderConfig {
pub quality: Quality,
/// The codec
pub codec: VpxVideoCodecId,
pub num_threads: u32,
}
#[derive(Clone, Copy, Debug)]
pub struct VpxDecoderConfig {
pub codec: VpxVideoCodecId,
pub num_threads: u32,
}
pub struct EncodeFrames<'a> {
@@ -406,11 +400,7 @@ impl VpxDecoder {
};
let mut ctx = Default::default();
let cfg = vpx_codec_dec_cfg_t {
threads: if config.num_threads == 0 {
num_cpus::get() as _
} else {
config.num_threads
},
threads: codec_thread_num() as _,
w: 0,
h: 0,
};