diff --git a/crates/nu_plugin_polars/src/cache/mod.rs b/crates/nu_plugin_polars/src/cache/mod.rs index 8862f5bb51..08a7a9d7f7 100644 --- a/crates/nu_plugin_polars/src/cache/mod.rs +++ b/crates/nu_plugin_polars/src/cache/mod.rs @@ -13,7 +13,7 @@ use nu_plugin::{EngineInterface, PluginCommand}; use nu_protocol::{LabeledError, ShellError, Span}; use uuid::Uuid; -use crate::{plugin_debug, values::PolarsPluginObject, PolarsPlugin}; +use crate::{plugin_debug, values::PolarsPluginObject, EngineWrapper, PolarsPlugin}; #[derive(Debug, Clone)] pub struct CacheValue { @@ -47,7 +47,7 @@ impl Cache { /// * `force` - Delete even if there are multiple references pub fn remove( &self, - maybe_engine: Option<&EngineInterface>, + engine: impl EngineWrapper, key: &Uuid, force: bool, ) -> Result, ShellError> { @@ -60,22 +60,23 @@ impl Cache { let removed = if force || reference_count.unwrap_or_default() < 1 { let removed = lock.remove(key); - plugin_debug!("PolarsPlugin: removing {key} from cache: {removed:?}"); + plugin_debug!( + engine, + "PolarsPlugin: removing {key} from cache: {removed:?}" + ); removed } else { - plugin_debug!("PolarsPlugin: decrementing reference count for {key}"); + plugin_debug!( + engine, + "PolarsPlugin: decrementing reference count for {key}" + ); None }; // Once there are no more entries in the cache // we can turn plugin gc back on - match maybe_engine { - Some(engine) if lock.is_empty() => { - plugin_debug!("PolarsPlugin: Cache is empty enabling GC"); - engine.set_gc_disabled(false).map_err(LabeledError::from)?; - } - _ => (), - }; + plugin_debug!(engine, "PolarsPlugin: Cache is empty enabling GC"); + engine.set_gc_disabled(false).map_err(LabeledError::from)?; drop(lock); Ok(removed) } @@ -84,23 +85,21 @@ impl Cache { /// The maybe_engine parameter is required outside of testing pub fn insert( &self, - maybe_engine: Option<&EngineInterface>, + engine: impl EngineWrapper, uuid: Uuid, value: PolarsPluginObject, span: Span, ) -> Result, ShellError> { let mut lock = self.lock()?; - plugin_debug!("PolarsPlugin: Inserting {uuid} into cache: {value:?}"); + plugin_debug!( + engine, + "PolarsPlugin: Inserting {uuid} into cache: {value:?}" + ); // turn off plugin gc the first time an entry is added to the cache // as we don't want the plugin to be garbage collected if there // is any live data - match maybe_engine { - Some(engine) if lock.is_empty() => { - plugin_debug!("PolarsPlugin: Cache has values disabling GC"); - engine.set_gc_disabled(true).map_err(LabeledError::from)?; - } - _ => (), - }; + plugin_debug!(engine, "PolarsPlugin: Cache has values disabling GC"); + engine.set_gc_disabled(true).map_err(LabeledError::from)?; let cache_value = CacheValue { uuid, value, @@ -154,7 +153,7 @@ pub trait Cacheable: Sized + Clone { span: Span, ) -> Result { plugin.cache.insert( - Some(engine), + engine, self.cache_id().to_owned(), self.to_cache_value()?, span, diff --git a/crates/nu_plugin_polars/src/cache/rm.rs b/crates/nu_plugin_polars/src/cache/rm.rs index 5918209f32..9113f39cec 100644 --- a/crates/nu_plugin_polars/src/cache/rm.rs +++ b/crates/nu_plugin_polars/src/cache/rm.rs @@ -63,7 +63,7 @@ fn remove_cache_entry( let key = as_uuid(key, span)?; let msg = plugin .cache - .remove(Some(engine), &key, true)? + .remove(engine, &key, true)? .map(|_| format!("Removed: {key}")) .unwrap_or_else(|| format!("No value found for key: {key}")); Ok(Value::string(msg, span)) diff --git a/crates/nu_plugin_polars/src/dataframe/eager/open.rs b/crates/nu_plugin_polars/src/dataframe/eager/open.rs index 902a4df5a9..8842e3c7fb 100644 --- a/crates/nu_plugin_polars/src/dataframe/eager/open.rs +++ b/crates/nu_plugin_polars/src/dataframe/eager/open.rs @@ -1,5 +1,6 @@ use crate::{ dataframe::values::NuSchema, + perf, values::{CustomValueSupport, NuLazyFrame}, PolarsPlugin, }; @@ -378,7 +379,10 @@ fn from_jsonl( .get_flag("schema")? .map(|schema| NuSchema::try_from(&schema)) .transpose()?; + if call.has_flag("lazy")? { + let start_time = std::time::Instant::now(); + let df = LazyJsonLineReader::new(file_path) .with_infer_schema_length(infer_schema) .with_schema(maybe_schema.map(|s| s.into())) @@ -390,6 +394,16 @@ fn from_jsonl( help: None, inner: vec![], })?; + + perf( + engine, + "Lazy json lines dataframe open", + start_time, + file!(), + line!(), + column!(), + ); + let df = NuLazyFrame::new(false, df); df.cache_and_to_value(plugin, engine, call.head) } else { @@ -410,6 +424,8 @@ fn from_jsonl( None => reader, }; + let start_time = std::time::Instant::now(); + let df: NuDataFrame = reader .finish() .map_err(|e| ShellError::GenericError { @@ -421,6 +437,15 @@ fn from_jsonl( })? .into(); + perf( + engine, + "Eager json lines dataframe open", + start_time, + file!(), + line!(), + column!(), + ); + df.cache_and_to_value(plugin, engine, call.head) } } @@ -484,6 +509,7 @@ fn from_csv( Some(r) => csv_reader.with_skip_rows(r), }; + let start_time = std::time::Instant::now(); let df: NuLazyFrame = csv_reader .finish() .map_err(|e| ShellError::GenericError { @@ -495,8 +521,18 @@ fn from_csv( })? .into(); + perf( + engine, + "Lazy CSV dataframe open", + start_time, + file!(), + line!(), + column!(), + ); + df.cache_and_to_value(plugin, engine, call.head) } else { + let start_time = std::time::Instant::now(); let df = CsvReadOptions::default() .with_has_header(!no_header) .with_infer_schema_length(infer_schema) @@ -529,6 +565,16 @@ fn from_csv( help: None, inner: vec![], })?; + + perf( + engine, + "Eager CSV dataframe open", + start_time, + file!(), + line!(), + column!(), + ); + let df = NuDataFrame::new(false, df); df.cache_and_to_value(plugin, engine, call.head) } diff --git a/crates/nu_plugin_polars/src/lib.rs b/crates/nu_plugin_polars/src/lib.rs index 3baca54ad9..76da2eb039 100644 --- a/crates/nu_plugin_polars/src/lib.rs +++ b/crates/nu_plugin_polars/src/lib.rs @@ -8,25 +8,89 @@ use nu_plugin::{EngineInterface, Plugin, PluginCommand}; mod cache; pub mod dataframe; pub use dataframe::*; -use nu_protocol::{ast::Operator, CustomValue, LabeledError, Spanned, Value}; +use nu_protocol::{ast::Operator, CustomValue, LabeledError, ShellError, Span, Spanned, Value}; use crate::{ eager::eager_commands, expressions::expr_commands, lazy::lazy_commands, series::series_commands, values::PolarsPluginCustomValue, }; +pub trait EngineWrapper { + fn get_env_var(&self, key: &str) -> Option; + fn use_color(&self) -> bool; + fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError>; +} + +impl EngineWrapper for &EngineInterface { + fn get_env_var(&self, key: &str) -> Option { + EngineInterface::get_env_var(self, key) + .ok() + .flatten() + .map(|x| match x { + Value::String { val, .. } => val, + _ => "".to_string(), + }) + } + + fn use_color(&self) -> bool { + self.get_config() + .ok() + .and_then(|config| config.color_config.get("use_color").cloned()) + .unwrap_or(Value::bool(false, Span::unknown())) + .is_true() + } + + fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> { + EngineInterface::set_gc_disabled(self, disabled) + } +} + #[macro_export] macro_rules! plugin_debug { - ($($arg:tt)*) => {{ - if std::env::var("POLARS_PLUGIN_DEBUG") - .ok() - .filter(|x| x == "1" || x == "true") + ($env_var_provider:tt, $($arg:tt)*) => {{ + if $env_var_provider.get_env_var("POLARS_PLUGIN_DEBUG") + .filter(|s| s == "1" || s == "true") .is_some() { eprintln!($($arg)*); } }}; } +pub fn perf( + env: impl EngineWrapper, + msg: &str, + dur: std::time::Instant, + file: &str, + line: u32, + column: u32, +) { + if env + .get_env_var("POLARS_PLUGIN_PERF") + .filter(|s| s == "1" || s == "true") + .is_some() + { + if env.use_color() { + eprintln!( + "perf: {}:{}:{} \x1b[32m{}\x1b[0m took \x1b[33m{:?}\x1b[0m", + file, + line, + column, + msg, + dur.elapsed(), + ); + } else { + eprintln!( + "perf: {}:{}:{} {} took {:?}", + file, + line, + column, + msg, + dur.elapsed(), + ); + } + } +} + #[derive(Default)] pub struct PolarsPlugin { pub(crate) cache: Cache, @@ -52,7 +116,7 @@ impl Plugin for PolarsPlugin { ) -> Result<(), LabeledError> { if !self.disable_cache_drop { let id = CustomValueType::try_from_custom_value(custom_value)?.id(); - let _ = self.cache.remove(Some(engine), &id, false); + let _ = self.cache.remove(engine, &id, false); } Ok(()) } @@ -193,6 +257,22 @@ pub mod test { } } + struct TestEngineWrapper; + + impl EngineWrapper for TestEngineWrapper { + fn get_env_var(&self, key: &str) -> Option { + std::env::var(key).ok() + } + + fn use_color(&self) -> bool { + false + } + + fn set_gc_disabled(&self, _disabled: bool) -> Result<(), ShellError> { + Ok(()) + } + } + pub fn test_polars_plugin_command(command: &impl PluginCommand) -> Result<(), ShellError> { test_polars_plugin_command_with_decls(command, vec![]) } @@ -212,7 +292,7 @@ pub mod test { let id = obj.id(); plugin .cache - .insert(None, id, obj, Span::test_data()) + .insert(TestEngineWrapper {}, id, obj, Span::test_data()) .unwrap(); } }