hello guys.
Im trying to write a parquet file as a delta table to my lakefs server installed in my local minikube server.
Im running this code:
import fsspec
import pandas as pd
import deltalake
import lakefs_client
from lakefs_client.models import *
from lakefs_client.client import LakeFSClient
import boto3
from lakefs_client.api import objects_api
from deltalake.writer import write_deltalake
import s3fs
import pyarrow as pa
lakefsEndPoint = '
http://127.0.0.1:59786/' # MINIKUBE SERVICE lakefs-service -n namespace (Forwarding port etc)
lakefsAccessKey = 'key'
lakefsSecretKey = 'secret'
repo_name = "internal"
import os
os.environ["AWS_ACCESS_KEY_ID"] = lakefsAccessKey
os.environ["AWS_SECRET_ACCESS_KEY"] = lakefsSecretKey
os.environ["AWS_ENDPOINT"] = lakefsEndPoint
os.environ["AWS_REGION"] = "us-east-1" #Not sure about this one but just did what you said.
# lakeFS credentials and endpoint
configuration = lakefs_client.Configuration()
configuration.username = lakefsAccessKey
configuration.password = lakefsSecretKey
configuration.host = lakefsEndPoint
lakefs = LakeFSClient(configuration)
storage_options = {"AWS_ACCESS_KEY_ID": lakefsAccessKey,
"AWS_SECRET_ACCESS_KEY":lakefsSecretKey,
"AWS_ENDPOINT": lakefsEndPoint,
"AWS_REGION": "us-east-1", #Not sure about this one but just did what you said.
"AWS_STORAGE_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true"
}
fs = s3fs.S3FileSystem(
endpoint_url="
http://127.0.0.1:59786/",
secret=lakefsSecretKey,
key=lakefsAccessKey
)
with fs.open('
s3a://repository/branch/lakes.parquet', 'rb') as f: #This works as expected
df = pd.read_parquet(f)
df = df.astype(str)
print(df.head())
# Initialize the Delta table
deltalake.write_deltalake(table_or_uri='
s3a://repository/branch/newTable',
data=df,
mode='overwrite',
storage_options=storage_options)
AND im getting this error:
Traceback (most recent call last):
File "C:\Users\project\test2.py", line 59, in <module>
my_new_dt = deltalake.DeltaTable('
s3a://repository/branch/deltaTable', storage_options=storage_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\path\deltalake\table.py", line 238, in
init
self._table = RawDeltaTable(
^^^^^^^^^^^^^^
_internal.DeltaError: Delta protocol violation: Failed to serialize operation: expected value at line 1 column 1
Any idea of whats going on?