add polars join_where command (#15635)

# Description

This adds `polars join_where` which allows joining two dataframes based
on a conditions. The command can be used as:

```
➜ let df_a = [[name cash];[Alice 5] [Bob 10]] | polars into-lazy
➜ let df_b = [[item price];[A 3] [B 7] [C 12]] | polars into-lazy
➜ $df_a | polars join_where $df_b ((polars col cash) > (polars col price)) | polars collect
╭───┬───────┬──────┬──────┬───────╮
│ # │ name  │ cash │ item │ price │
├───┼───────┼──────┼──────┼───────┤
│ 0 │ Bob   │   10 │ B    │     7 │
│ 1 │ Bob   │   10 │ A    │     3 │
│ 2 │ Alice │    5 │ A    │     3 │
╰───┴───────┴──────┴──────┴───────╯
```

# User-Facing Changes

- new command `polars join_where`
This commit is contained in:
Matthias Meschede 2025-04-24 23:44:29 +02:00 committed by GitHub
parent 208ebeefab
commit 05c36d1bc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 123 additions and 0 deletions

View File

@ -61,6 +61,7 @@ features = [
"cloud",
"concat_str",
"cross_join",
"iejoin",
"csv",
"cum_agg",
"default",

View File

@ -0,0 +1,119 @@
use crate::{
dataframe::values::{Column, NuDataFrame, NuExpression, NuLazyFrame},
values::CustomValueSupport,
PolarsPlugin,
};
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, Signature, Span, SyntaxShape, Type, Value,
};
#[derive(Clone)]
pub struct LazyJoinWhere;
impl PluginCommand for LazyJoinWhere {
type Plugin = PolarsPlugin;
fn name(&self) -> &str {
"polars join_where"
}
fn description(&self) -> &str {
"Joins a lazy frame with other lazy frame based on conditions."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.required("other", SyntaxShape::Any, "LazyFrame to join with")
.required("condition", SyntaxShape::Any, "Condition")
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
)
.category(Category::Custom("lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![Example {
description: "Join two lazy dataframes with a condition",
example: r#"let df_a = ([[name cash];[Alice 5] [Bob 10]] | polars into-lazy)
let df_b = ([[item price];[A 3] [B 7] [C 12]] | polars into-lazy)
$df_a | polars join_where $df_b ((polars col cash) > (polars col price)) | polars collect"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"name".to_string(),
vec![
Value::test_string("Bob"),
Value::test_string("Bob"),
Value::test_string("Alice"),
],
),
Column::new(
"cash".to_string(),
vec![Value::test_int(10), Value::test_int(10), Value::test_int(5)],
),
Column::new(
"item".to_string(),
vec![
Value::test_string("B"),
Value::test_string("A"),
Value::test_string("A"),
],
),
Column::new(
"price".to_string(),
vec![Value::test_int(7), Value::test_int(3), Value::test_int(3)],
),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
}]
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: PipelineData,
) -> Result<PipelineData, LabeledError> {
let other: Value = call.req(0)?;
let other = NuLazyFrame::try_from_value_coerce(plugin, &other)?;
let other = other.to_polars();
let condition: Value = call.req(1)?;
let condition = NuExpression::try_from_value(plugin, &condition)?;
let condition = condition.into_polars();
let pipeline_value = input.into_value(call.head)?;
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &pipeline_value)?;
let from_eager = lazy.from_eager;
let lazy = lazy.to_polars();
let lazy = lazy
.join_builder()
.with(other)
.force_parallel(true)
.join_where(vec![condition]);
let lazy = NuLazyFrame::new(from_eager, lazy);
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::test_polars_plugin_command;
#[test]
fn test_examples() -> Result<(), nu_protocol::ShellError> {
test_polars_plugin_command(&LazyJoinWhere)
}
}

View File

@ -19,6 +19,7 @@ mod first;
mod flatten;
mod get;
mod join;
mod join_where;
mod last;
mod len;
mod lit;
@ -61,6 +62,7 @@ pub use first::FirstDF;
use flatten::LazyFlatten;
pub use get::GetDF;
use join::LazyJoin;
use join_where::LazyJoinWhere;
pub use last::LastDF;
pub use lit::ExprLit;
use query_df::QueryDf;
@ -106,6 +108,7 @@ pub(crate) fn data_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugin
Box::new(LazyFillNull),
Box::new(LazyFlatten),
Box::new(LazyJoin),
Box::new(LazyJoinWhere),
Box::new(reverse::LazyReverse),
Box::new(select::LazySelect),
Box::new(LazySortBy),