From 2d4c802f99fa9bcb3bf510bfee3015fe792b8bd0 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Tue, 25 Mar 2025 12:15:17 -0500 Subject: [PATCH 1/8] Added execute dask utilizing dask expr --- charmpandas/dask_expr.py | 17 +++++++++++++++++ examples/Demo.ipynb | 36 ++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 8 deletions(-) create mode 100644 charmpandas/dask_expr.py diff --git a/charmpandas/dask_expr.py b/charmpandas/dask_expr.py new file mode 100644 index 0000000..d9da49d --- /dev/null +++ b/charmpandas/dask_expr.py @@ -0,0 +1,17 @@ +import dask.dataframe as dd +import charmpandas as pd + +def execute_dask(simplified_expr): # for now replaces dask read parquet with charmpandas read_parquet + print(simplified_expr) + if 'dask.dataframe.dask_expr' in str(type(simplified_expr)): + print(f'{simplified_expr._funcname}') + print(f'Operands - {simplified_expr.operands})') + if simplified_expr._funcname == 'read_parquet': + print("Swapping dask read parquet with charmpandas") + print(f"Reading - {simplified_expr.operands[0]}") + simplified_expr = pd.read_parquet(execute_dask(simplified_expr.operands[0])) + # Need to add similar kind of execs for all other functions + else: + for o in simplified_expr.operands: + o = execute_dask(o) + return simplified_expr \ No newline at end of file diff --git a/examples/Demo.ipynb b/examples/Demo.ipynb index 830a904..7185ef9 100644 --- a/examples/Demo.ipynb +++ b/examples/Demo.ipynb @@ -5,10 +5,22 @@ "execution_count": 1, "id": "2da208e1-3efd-4375-a9f9-739407f1d4eb", "metadata": {}, - "outputs": [], + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'charmpandas'", + "output_type": "error", + "traceback": [ + "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[1;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[1;32mIn[1], line 1\u001b[0m\n\u001b[1;32m----> 1\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mcharmpandas\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mpd\u001b[39;00m\n\u001b[0;32m 2\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mcharmpandas\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01minterface\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m LocalCluster, CCSInterface\n", + "\u001b[1;31mModuleNotFoundError\u001b[0m: No module named 'charmpandas'" + ] + } + ], "source": [ "import charmpandas as pd\n", - "from charmpandas.interface import SLURMCluster, CCSInterface" + "from charmpandas.interface import LocalCluster, CCSInterface" ] }, { @@ -26,7 +38,8 @@ } ], "source": [ - "cluster = SLURMCluster('mzu-delta-cpu', 'cpu', '/u/bhosale/charmpandas', tasks_per_node=32)\n", + "\n", + "cluster = LocalCluster(min_pes=4,max_pes=4, odf=4,activity_timeout=60)\n", "pd.set_interface(cluster)" ] }, @@ -37,7 +50,7 @@ "metadata": {}, "outputs": [], "source": [ - "df_ids = pd.read_parquet(\"/u/bhosale/charmpandas/examples/user_ids_small.parquet\")" + "df_ids = pd.read_parquet(\"../user_ids_small.parquet\")" ] }, { @@ -47,7 +60,7 @@ "metadata": {}, "outputs": [], "source": [ - "df_ages = pd.read_parquet(\"/u/bhosale/charmpandas/examples/ages_small.parquet\")" + "df_ages = pd.read_parquet(\"../ages_small.parquet\")" ] }, { @@ -140,12 +153,19 @@ "id": "32b3b987-00f1-4a03-9723-f122f8b44a23", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "import dask.dataframe as dd\n", + "from charmpandas import execute_dask\n", + "\n", + "df = dd.read_parquet(\"../user_ids_small.parquet\")\n", + "execute_dask(df.simplify())\n", + "\n" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -159,7 +179,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.1" + "version": "3.12.4" } }, "nbformat": 4, From 6d84f72921e19314336def723549fc2942cdfaa4 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Tue, 1 Apr 2025 20:40:07 -0500 Subject: [PATCH 2/8] Added most features --- charmpandas/dask_expr.py | 80 ++++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 12 deletions(-) diff --git a/charmpandas/dask_expr.py b/charmpandas/dask_expr.py index d9da49d..d09bf3c 100644 --- a/charmpandas/dask_expr.py +++ b/charmpandas/dask_expr.py @@ -1,17 +1,73 @@ import dask.dataframe as dd import charmpandas as pd - +from dask.dataframe.dask_expr._expr import Expr +from dask.dataframe.dask_expr._groupby import GroupBy +from charmpandas.interface import LocalCluster, CCSInterface +from functools import lru_cache +cluster = LocalCluster(min_pes=4,max_pes=4, odf=4,activity_timeout=60) +pd.set_interface(cluster) def execute_dask(simplified_expr): # for now replaces dask read parquet with charmpandas read_parquet + callables = ['read_parquet'] print(simplified_expr) - if 'dask.dataframe.dask_expr' in str(type(simplified_expr)): - print(f'{simplified_expr._funcname}') - print(f'Operands - {simplified_expr.operands})') - if simplified_expr._funcname == 'read_parquet': - print("Swapping dask read parquet with charmpandas") - print(f"Reading - {simplified_expr.operands[0]}") - simplified_expr = pd.read_parquet(execute_dask(simplified_expr.operands[0])) - # Need to add similar kind of execs for all other functions + if not (isinstance(simplified_expr, Expr) or isinstance(simplified_expr, GroupBy)): + print(f'Operation - {simplified_expr} not supported in charm pandas') + return simplified_expr + else: + if isinstance(simplified_expr, GroupBy): + return simplified_expr + # return charm_mapper('groupby', [simplified_expr.by]) + # elif simplified_expr._funcname in callables: + # f = charm_mapper(simplified_expr._funcname) + # args = [execute_dask(o) for o in simplified_expr.operands] + # return f(args) else: - for o in simplified_expr.operands: - o = execute_dask(o) - return simplified_expr \ No newline at end of file + args = [] + if '_funcname' not in dir(simplified_expr): + print(f'Operation - {simplified_expr} not supported') + return simplified_expr + try: + args = [execute_dask(o) for o in simplified_expr.operands] + except: + print("No operands found") + return charm_mapper(simplified_expr._funcname, args) + + +@lru_cache(maxsize=None) +def read_parquet(path): + return pd.read_parquet(path) + +def charm_mapper(func_name, args): + # Dataframe operations + if func_name == 'read_parquet': + return read_parquet(args[0]) + elif func_name == 'projection': + return args[0][args[1]] + elif func_name == 'merge': + return args[0].merge(args[1], how=args[2], left_on=args[3], right_on=args[4]) + elif func_name == 'groupby': # Difficult to integrate with other expr since groupby is not an expr + return args[0].groupby(args[1]) + elif func_name == 'add': + return args[0] + args[1] + elif func_name == 'sub': + return args[0] - args[1] + elif func_name == 'mul': + return args[0] * args[1] + elif func_name == 'div': + return args[0] / args[1] + elif func_name == 'lt': + return args[0] < args[1] + elif func_name == 'le': + return args[0] <= args[1] + elif func_name == 'gt': + return args[0] > args[1] + elif func_name == 'ge': + return args[0] >= args[1] + elif func_name == 'eq': + return args[0] == args[1] + elif func_name == 'ne': + return args[0] != args[1] + elif func_name == 'count': + return args[0].count() + elif func_name == 'sum': + return args[0].sum() + return None \ No newline at end of file From 8fb0ed959b311ebfc7a0c023843209a297132d5c Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Sun, 6 Apr 2025 17:48:30 -0500 Subject: [PATCH 3/8] group by feature --- charmpandas/dask_expr.py | 54 ++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/charmpandas/dask_expr.py b/charmpandas/dask_expr.py index d09bf3c..b7320fe 100644 --- a/charmpandas/dask_expr.py +++ b/charmpandas/dask_expr.py @@ -4,33 +4,32 @@ from dask.dataframe.dask_expr._groupby import GroupBy from charmpandas.interface import LocalCluster, CCSInterface from functools import lru_cache +import copy cluster = LocalCluster(min_pes=4,max_pes=4, odf=4,activity_timeout=60) pd.set_interface(cluster) -def execute_dask(simplified_expr): # for now replaces dask read parquet with charmpandas read_parquet - callables = ['read_parquet'] - print(simplified_expr) - if not (isinstance(simplified_expr, Expr) or isinstance(simplified_expr, GroupBy)): - print(f'Operation - {simplified_expr} not supported in charm pandas') - return simplified_expr +def execute_dask(dask_obj, depth=0): # for now replaces dask read parquet with charmpandas read_parquet + # print(simplified_expr) + if not (isinstance(dask_obj, Expr) or isinstance(dask_obj, GroupBy)): + # print(f'Operation - {simplified_expr} not supported in charm pandas') + return dask_obj else: - if isinstance(simplified_expr, GroupBy): - return simplified_expr - # return charm_mapper('groupby', [simplified_expr.by]) - # elif simplified_expr._funcname in callables: - # f = charm_mapper(simplified_expr._funcname) - # args = [execute_dask(o) for o in simplified_expr.operands] - # return f(args) + if isinstance(dask_obj, GroupBy): + pd_df = execute_dask(dask_obj.obj.expr) + return pd_df.groupby(dask_obj.by) else: args = [] - if '_funcname' not in dir(simplified_expr): - print(f'Operation - {simplified_expr} not supported') - return simplified_expr + if '_funcname' not in dir(dask_obj): + # print(f'Operation - {simplified_expr} not supported') + return dask_obj try: - args = [execute_dask(o) for o in simplified_expr.operands] - except: - print("No operands found") - return charm_mapper(simplified_expr._funcname, args) - + args = [execute_dask(o, depth+1) for o in dask_obj.operands] + except Exception as e: + print(f"Error in executing {dask_obj}: {e}") + result = charm_mapper(dask_obj._funcname, args) + # Clear the cache only at the top level (depth=0) + if depth == 0: + read_parquet.cache_clear() + return result @lru_cache(maxsize=None) def read_parquet(path): @@ -70,4 +69,15 @@ def charm_mapper(func_name, args): return args[0].count() elif func_name == 'sum': return args[0].sum() - return None \ No newline at end of file + elif func_name == 'assign': + print(args) + if len(args) == 2: # Assign a df + args[0] = args[1] + return args[0] + else: # Assign a column + args[0][args[1]] = args[2] + return args[0] + # Add assignment operations + return None + +# New function to handle groupby operations From d9899069a2e4e1c6599815e15e22628256249cd2 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Mon, 7 Apr 2025 09:26:28 -0500 Subject: [PATCH 4/8] demo --- examples/Demo.ipynb | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/examples/Demo.ipynb b/examples/Demo.ipynb index 7185ef9..c279d23 100644 --- a/examples/Demo.ipynb +++ b/examples/Demo.ipynb @@ -43,6 +43,14 @@ "pd.set_interface(cluster)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "c4f0e291", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "code", "execution_count": 3, @@ -155,10 +163,19 @@ "outputs": [], "source": [ "import dask.dataframe as dd\n", - "from charmpandas import execute_dask\n", + "from charmpandas.dask_expr import execute_dask\n", "\n", "df = dd.read_parquet(\"../user_ids_small.parquet\")\n", - "execute_dask(df.simplify())\n", + "\n", + "df.city = df.city + df.city\n", + "\n", + "df = df.simplify()\n", + "\n", + "res = df.groupby(['city'])\n", + "\n", + "pd_df = execute_dask(res) \n", + "\n", + "pd_df.get()\n", "\n" ] } From 48ba23e4ae57d6c3967e8a8b669cf2a2f7026f63 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Tue, 22 Apr 2025 18:46:39 -0500 Subject: [PATCH 5/8] Added column read parquet --- charmpandas/dask_expr.py | 6 +++--- charmpandas/interface.py | 6 +++++- examples/Demo.ipynb | 6 +++--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/charmpandas/dask_expr.py b/charmpandas/dask_expr.py index b7320fe..0188413 100644 --- a/charmpandas/dask_expr.py +++ b/charmpandas/dask_expr.py @@ -32,13 +32,13 @@ def execute_dask(dask_obj, depth=0): # for now replaces dask read parquet with c return result @lru_cache(maxsize=None) -def read_parquet(path): - return pd.read_parquet(path) +def read_parquet(path, cols): + return pd.read_parquet(path, cols) def charm_mapper(func_name, args): # Dataframe operations if func_name == 'read_parquet': - return read_parquet(args[0]) + return read_parquet(args[0], tuple(args[1])) elif func_name == 'projection': return args[0][args[1]] elif func_name == 'merge': diff --git a/charmpandas/interface.py b/charmpandas/interface.py index 3a66cfb..f68e83a 100644 --- a/charmpandas/interface.py +++ b/charmpandas/interface.py @@ -183,7 +183,7 @@ def get_deletion_header(self): def mark_deletion(self, table_name): self.deletion_buffer.append(table_name) - def read_parquet(self, table_name, file_path): + def read_parquet(self, table_name, file_path, columns=[]): self.activity_handler() cmd = self.get_header(self.epoch) @@ -191,6 +191,10 @@ def read_parquet(self, table_name, file_path): gcmd += to_bytes(Operations.read, 'i') gcmd += to_bytes(table_name, 'i') gcmd += string_bytes(file_path) + if columns: + gcmd += to_bytes(len(columns), 'i') + for col in columns: + gcmd += string_bytes(col) cmd += to_bytes(len(gcmd), 'i') cmd += gcmd diff --git a/examples/Demo.ipynb b/examples/Demo.ipynb index c279d23..1210b65 100644 --- a/examples/Demo.ipynb +++ b/examples/Demo.ipynb @@ -165,15 +165,15 @@ "import dask.dataframe as dd\n", "from charmpandas.dask_expr import execute_dask\n", "\n", - "df = dd.read_parquet(\"../user_ids_small.parquet\")\n", + "df = dd.read_parquet(\"../user_ids_small.parquet\", ['city'])\n", "\n", "df.city = df.city + df.city\n", "\n", "df = df.simplify()\n", "\n", - "res = df.groupby(['city'])\n", + "# res = df.groupby(['city'])\n", "\n", - "pd_df = execute_dask(res) \n", + "pd_df = execute_dask(df.expr) \n", "\n", "pd_df.get()\n", "\n" From 822f1230435a397c8963bd1645c4c95186450b70 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Fri, 25 Apr 2025 10:29:48 -0500 Subject: [PATCH 6/8] added col wise read --- charmpandas/dataframe.py | 7 +++-- charmpandas/operations.py | 4 +-- src/partition.cpp | 60 ++++++++++++++++++++++++++++++--------- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/charmpandas/dataframe.py b/charmpandas/dataframe.py index 0d680c3..1772d47 100644 --- a/charmpandas/dataframe.py +++ b/charmpandas/dataframe.py @@ -151,11 +151,14 @@ def sum(self): class DataFrame(object): - def __init__(self, data): + def __init__(self, data, cols=None): interface = get_interface() self.name = get_table_name() if isinstance(data, str): - interface.read_parquet(self.name, data) + if cols: + interface.read_parquet(self.name, data, cols) + else: + interface.read_parquet(self.name, data) elif data == None: # this is a result of some operation pass diff --git a/charmpandas/operations.py b/charmpandas/operations.py index 9c97445..529ada3 100644 --- a/charmpandas/operations.py +++ b/charmpandas/operations.py @@ -1,7 +1,7 @@ from charmpandas.dataframe import get_interface, get_table_name, DataFrame -def read_parquet(file_path): - return DataFrame(file_path) +def read_parquet(file_path, cols=None): + return DataFrame(file_path, cols) def concat(objs): if (objs and len(objs) == 0) or objs is None: diff --git a/src/partition.cpp b/src/partition.cpp index 784bcd9..5924e4d 100644 --- a/src/partition.cpp +++ b/src/partition.cpp @@ -329,9 +329,29 @@ void Partition::operation_read(char* cmd) int table_name = extract(cmd); int path_size = extract(cmd); std::string file_path(cmd, path_size); - if (thisIndex == 0) + cmd += path_size; + + // Extract column selection if present + std::vector columns; + int num_columns = extract(cmd); + for (int i = 0; i < num_columns; i++) { + int col_size = extract(cmd); + columns.push_back(std::string(cmd, col_size)); + cmd += col_size; + } + + if (thisIndex == 0) { CkPrintf("[%d] Reading file: %s\n", thisIndex, file_path.c_str()); - read_parquet(table_name, file_path); + if (!columns.empty()) { + CkPrintf("[%d] Selected columns:", thisIndex); + for (const auto& col : columns) { + CkPrintf(" %s", col.c_str()); + } + CkPrintf("\n"); + } + } + + read_parquet(table_name, file_path, columns); complete_operation(); } @@ -710,7 +730,7 @@ arrow::Datum Partition::traverse_ast(char* &msg) } } -void Partition::read_parquet(int table_name, std::string file_path) +void Partition::read_parquet(int table_name, std::string file_path, const std::vector& columns) { std::vector files = get_matching_files(file_path); std::shared_ptr input_file; @@ -724,18 +744,31 @@ void Partition::read_parquet(int table_name, std::string file_path) std::string file = files[i]; input_file = arrow::io::ReadableFile::Open(file).ValueOrDie(); - // Create a ParquetFileReader instance + // Create a ParquetFileReader instance with options for column selection std::unique_ptr reader; parquet::arrow::OpenFile(input_file, arrow::default_memory_pool(), &reader); // Get the file metadata std::shared_ptr file_metadata = reader->parquet_reader()->metadata(); + // Get schema and create column selection + std::shared_ptr schema; + reader->GetSchema(&schema); + std::vector column_indices; + + // Convert column names to indices if columns are specified + if (!columns.empty()) { + for (const auto& col : columns) { + int idx = schema->GetFieldIndex(col); + if (idx != -1) { + column_indices.push_back(idx); + } else { + CkPrintf("Warning: Column '%s' not found in parquet file\n", col.c_str()); + } + } + } + int num_rows = file_metadata->num_rows(); - - //if (thisIndex == 0) - // CkPrintf("[%d] Reading %i rows from %s\n", thisIndex, num_rows, file.c_str()); - int nrows_per_partition = num_rows / num_partitions; int start_row = nrows_per_partition * thisIndex; int nextra_rows = num_rows - num_partitions * nrows_per_partition; @@ -772,9 +805,13 @@ void Partition::read_parquet(int table_name, std::string file_path) // Calculate how many rows to read from this row group int64_t rows_in_group = std::min(rows_to_read, row_group_num_rows - start_row); - // Read the rows + // Read the rows with column selection TablePtr table; - reader->ReadRowGroup(i, &table); + if (column_indices.empty()) { + reader->ReadRowGroup(i, &table); + } else { + reader->ReadRowGroup(i, column_indices, &table); + } TablePtr sliced_table = table->Slice(start_row, rows_in_group); row_tables.push_back(sliced_table); @@ -808,9 +845,6 @@ void Partition::read_parquet(int table_name, std::string file_path) read_tables->num_rows()).ValueOrDie(); tables[table_name] = set_column(read_tables, "home_partition", arrow::Datum( arrow::ChunkedArray::Make({home_partition_array}).ValueOrDie()))->CombineChunks().ValueOrDie(); - - - //CkPrintf("[%d] Read number of rows = %i\n", thisIndex, combined->num_rows()); } Aggregator::Aggregator(CProxy_Main main_proxy_) From 78d7eab92f3fbd013a1ecca1870f365d6c910087 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Sat, 3 May 2025 13:01:26 -0500 Subject: [PATCH 7/8] filter read parquet --- charmpandas/dataframe.py | 7 ++----- charmpandas/interface.py | 16 ++++++++++++++- charmpandas/operations.py | 4 ++-- src/partition.cpp | 41 +++++++++++++++++++++++++++++++++++++-- src/partition.hpp | 5 ++++- 5 files changed, 62 insertions(+), 11 deletions(-) diff --git a/charmpandas/dataframe.py b/charmpandas/dataframe.py index 1772d47..3b96618 100644 --- a/charmpandas/dataframe.py +++ b/charmpandas/dataframe.py @@ -151,14 +151,11 @@ def sum(self): class DataFrame(object): - def __init__(self, data, cols=None): + def __init__(self, data, cols=None, filters=None): interface = get_interface() self.name = get_table_name() if isinstance(data, str): - if cols: - interface.read_parquet(self.name, data, cols) - else: - interface.read_parquet(self.name, data) + interface.read_parquet(self.name, data, cols, filters) elif data == None: # this is a result of some operation pass diff --git a/charmpandas/interface.py b/charmpandas/interface.py index f68e83a..d938967 100644 --- a/charmpandas/interface.py +++ b/charmpandas/interface.py @@ -183,7 +183,7 @@ def get_deletion_header(self): def mark_deletion(self, table_name): self.deletion_buffer.append(table_name) - def read_parquet(self, table_name, file_path, columns=[]): + def read_parquet(self, table_name, file_path, columns=None, filters=None): self.activity_handler() cmd = self.get_header(self.epoch) @@ -191,10 +191,24 @@ def read_parquet(self, table_name, file_path, columns=[]): gcmd += to_bytes(Operations.read, 'i') gcmd += to_bytes(table_name, 'i') gcmd += string_bytes(file_path) + + # Add columns if columns: gcmd += to_bytes(len(columns), 'i') for col in columns: gcmd += string_bytes(col) + else: + gcmd += to_bytes(0, 'i') # No columns specified + + # Add filters + if filters: + gcmd += to_bytes(len(filters), 'i') # Number of filters + for col, op, value in filters: + gcmd += string_bytes(col) # Column name + gcmd += string_bytes(op) # Operator + gcmd += string_bytes(str(value)) # Value (converted to string) + else: + gcmd += to_bytes(0, 'i') # No filters specified cmd += to_bytes(len(gcmd), 'i') cmd += gcmd diff --git a/charmpandas/operations.py b/charmpandas/operations.py index 529ada3..4492aac 100644 --- a/charmpandas/operations.py +++ b/charmpandas/operations.py @@ -1,7 +1,7 @@ from charmpandas.dataframe import get_interface, get_table_name, DataFrame -def read_parquet(file_path, cols=None): - return DataFrame(file_path, cols) +def read_parquet(file_path, cols=None, filters=None): + return DataFrame(file_path, cols, filters) def concat(objs): if (objs and len(objs) == 0) or objs is None: diff --git a/src/partition.cpp b/src/partition.cpp index 5924e4d..e3e6592 100644 --- a/src/partition.cpp +++ b/src/partition.cpp @@ -340,6 +340,28 @@ void Partition::operation_read(char* cmd) cmd += col_size; } + // Extract filters if present + std::vector> filters; + int num_filters = extract(cmd); + for (int i = 0; i < num_filters; i++) { + // Extract column name + int col_name_size = extract(cmd); + std::string col_name(cmd, col_name_size); + cmd += col_name_size; + + // Extract operator + int op_size = extract(cmd); + std::string op(cmd, op_size); + cmd += op_size; + + // Extract value + int value_size = extract(cmd); + std::string value(cmd, value_size); + cmd += value_size; + + filters.push_back(std::make_tuple(col_name, op, value)); + } + if (thisIndex == 0) { CkPrintf("[%d] Reading file: %s\n", thisIndex, file_path.c_str()); if (!columns.empty()) { @@ -349,9 +371,19 @@ void Partition::operation_read(char* cmd) } CkPrintf("\n"); } + if (!filters.empty()) { + CkPrintf("[%d] Applying filters:", thisIndex); + for (const auto& filter : filters) { + CkPrintf(" %s %s %s;", + std::get<0>(filter).c_str(), + std::get<1>(filter).c_str(), + std::get<2>(filter).c_str()); + } + CkPrintf("\n"); + } } - read_parquet(table_name, file_path, columns); + read_parquet(table_name, file_path, columns, filters); complete_operation(); } @@ -730,7 +762,8 @@ arrow::Datum Partition::traverse_ast(char* &msg) } } -void Partition::read_parquet(int table_name, std::string file_path, const std::vector& columns) +void Partition::read_parquet(int table_name, std::string file_path, const std::vector& columns, + const std::vector>& filters) { std::vector files = get_matching_files(file_path); std::shared_ptr input_file; @@ -812,6 +845,10 @@ void Partition::read_parquet(int table_name, std::string file_path, const std::v } else { reader->ReadRowGroup(i, column_indices, &table); } + + // Note: We're skipping the dataset-based filtering for now since it caused errors + // Filter implementation would need the arrow::dataset module properly included + TablePtr sliced_table = table->Slice(start_row, rows_in_group); row_tables.push_back(sliced_table); diff --git a/src/partition.hpp b/src/partition.hpp index 4da7074..fb304a1 100644 --- a/src/partition.hpp +++ b/src/partition.hpp @@ -15,6 +15,7 @@ #include "arrow/acero/exec_plan.h" #include #include "arrow/compute/expression.h" +#include "arrow/dataset/file_ipc.h" #include "types.hpp" #include "partition.decl.h" @@ -284,7 +285,9 @@ class Partition : public CBase_Partition arrow::Datum traverse_ast(char* &msg); - void read_parquet(int table_name, std::string file_path); + void read_parquet(int table_name, std::string file_path, + const std::vector& columns = std::vector(), + const std::vector>& filters = std::vector>()); template void reduce_scalar(ScalarPtr& scalar, AggregateOperation& op); From 858b5bb670421ae52751fcc667269470b37c36d2 Mon Sep 17 00:00:00 2001 From: rkmohan2 Date: Sat, 3 May 2025 14:32:22 -0500 Subject: [PATCH 8/8] filter read parquet for dask expr --- charmpandas/dask_expr.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/charmpandas/dask_expr.py b/charmpandas/dask_expr.py index 0188413..f629b80 100644 --- a/charmpandas/dask_expr.py +++ b/charmpandas/dask_expr.py @@ -32,13 +32,16 @@ def execute_dask(dask_obj, depth=0): # for now replaces dask read parquet with c return result @lru_cache(maxsize=None) -def read_parquet(path, cols): - return pd.read_parquet(path, cols) +def read_parquet(path, cols, filters): + return pd.read_parquet(path, cols, filters) def charm_mapper(func_name, args): # Dataframe operations if func_name == 'read_parquet': - return read_parquet(args[0], tuple(args[1])) + file = args[0] + cols = tuple(args[1]) if args[1] else tuple([]) + filters = tuple(args[2]) if args[2] else tuple([]) + return read_parquet(args[0], cols, filters) elif func_name == 'projection': return args[0][args[1]] elif func_name == 'merge':