https://delta.io logo
e

Eirik Knævelsrud

08/24/2023, 8:32 AM
hello guys. Im trying to write a parquet file as a delta table to my lakefs server installed in my local minikube server. Im running this code: import fsspec import pandas as pd import deltalake import lakefs_client from lakefs_client.models import * from lakefs_client.client import LakeFSClient import boto3 from lakefs_client.api import objects_api from deltalake.writer import write_deltalake import s3fs import pyarrow as pa lakefsEndPoint = 'http://127.0.0.1:59786/' # MINIKUBE SERVICE lakefs-service -n namespace (Forwarding port etc) lakefsAccessKey = 'key' lakefsSecretKey = 'secret' repo_name = "internal" import os os.environ["AWS_ACCESS_KEY_ID"] = lakefsAccessKey os.environ["AWS_SECRET_ACCESS_KEY"] = lakefsSecretKey os.environ["AWS_ENDPOINT"] = lakefsEndPoint os.environ["AWS_REGION"] = "us-east-1" #Not sure about this one but just did what you said. # lakeFS credentials and endpoint configuration = lakefs_client.Configuration() configuration.username = lakefsAccessKey configuration.password = lakefsSecretKey configuration.host = lakefsEndPoint lakefs = LakeFSClient(configuration) storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey, "AWS_SECRET_ACCESS_KEY":lakefsSecretKey, "AWS_ENDPOINT": lakefsEndPoint, "AWS_REGION": "us-east-1", #Not sure about this one but just did what you said. "AWS_STORAGE_ALLOW_HTTP": "true", "AWS_S3_ALLOW_UNSAFE_RENAME": "true" } fs = s3fs.S3FileSystem( endpoint_url="http://127.0.0.1:59786/", secret=lakefsSecretKey, key=lakefsAccessKey ) with fs.open('s3a://repository/branch/lakes.parquet', 'rb') as f: #This works as expected df = pd.read_parquet(f) df = df.astype(str) print(df.head()) # Initialize the Delta table deltalake.write_deltalake(table_or_uri='s3a://repository/branch/newTable', data=df, mode='overwrite', storage_options=storage_options) AND im getting this error: Traceback (most recent call last): File "C:\Users\project\test2.py", line 59, in <module> my_new_dt = deltalake.DeltaTable('s3a://repository/branch/deltaTable', storage_options=storage_options) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\path\deltalake\table.py", line 238, in init self._table = RawDeltaTable( ^^^^^^^^^^^^^^ _internal.DeltaError: Delta protocol violation: Failed to serialize operation: expected value at line 1 column 1 Any idea of whats going on?
i

Itagyba Abondanza Kuhlmann

08/24/2023, 11:34 PM
It seems that the exact line that is being failing is not in the piece of code you shared.
e

Eirik Knævelsrud

08/25/2023, 2:10 AM
Well then thing here is that I call this: datalake.DeltaTable function as i have run, and it is a part of this library: pip install deltalake. This method RawDeltaTable() is a subsequent call of this method. In other words its inside inside this pip install deltalake package. https://github.com/delta-io/delta-rs/blob/main/python/deltalake/table.py Look at line 239.
i

Itagyba Abondanza Kuhlmann

08/26/2023, 12:51 AM
I was mentioning this line 59 in test2.py Which starts with a variable I couldn't find in your example.
Copy code
File "C:\Users\project\test2.py", line 59, in <module>
my_new_dt = deltalake.DeltaTable('<s3a://repository/branch/deltaTable|s3a://repository/branch/deltaTable>', storage_options=storage_options)
e

Eirik Knævelsrud

08/28/2023, 7:03 AM
ok i see what you mean. this is the code that down the three is calling this deltalake.DeltaTable - fucntion: deltalake.write_deltalake(table_or_uri='s3a://repository/branch/newTable', data=df, mode='overwrite', storage_options=storage_options) deltalake.write_deltalake() --> deltalake.DeltaTable() --> self._table = RawDeltaTable() - constructor
i

Itagyba Abondanza Kuhlmann

08/28/2023, 8:35 PM
Well, it has its own literal S3 path, which differs from the one you provide upstream, isn’t it?