eland.pandas_to_eland#

eland.pandas_to_eland(pd_df: DataFrame, es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch], es_dest_index: str, es_if_exists: str = 'fail', es_refresh: bool = False, es_dropna: bool = False, es_type_overrides: Optional[Mapping[str, str]] = None, es_verify_mapping_compatibility: bool = True, thread_count: int = 4, chunksize: Optional[int] = None, use_pandas_index_for_es_ids: bool = True) DataFrame#

将 pandas DataFrame 追加到 Elasticsearch 索引。主要用于测试。修改 Elasticsearch 目标索引。

参数#

es_client: Elasticsearch 客户端参数
  • elasticsearch-py 参数或

  • elasticsearch-py 实例

es_dest_index: str

要追加到的 Elasticsearch 索引的名称

es_if_exists{‘fail’, ‘replace’, ‘append’}, 默认 ‘fail’

如果索引已存在,如何操作。

  • fail: 抛出 ValueError。

  • replace: 在插入新值之前删除索引。

  • append: 将新值插入到现有索引中。如果不存在则创建。

es_refresh: bool, 默认 ‘False’

在批量索引后刷新 es_dest_index

es_dropna: bool, 默认 ‘False’
  • True: 删除缺失值(参见 pandas.Series.dropna)

  • False: 包含缺失值 - 可能会导致批量操作失败

es_type_overrides: dict, 默认 None

field_name: es_data_type 字典,覆盖默认的 es 数据类型

es_verify_mapping_compatibility: bool, 默认 ‘True’
  • True: 验证 DataFrame 架构是否与 Elasticsearch 索引架构匹配

  • False: 不验证架构

thread_count: int

用于批量请求的线程数量

chunksize: int, 默认 None

在批量索引到 Elasticsearch 之前读取的 pandas.DataFrame 行数

use_pandas_index_for_es_ids: bool, 默认 ‘True’
  • True: pandas.DataFrame.index 字段将用于填充 Elasticsearch ‘_id’ 字段。

  • False: 在索引到 Elasticsearch 时忽略 pandas.DataFrame.index

返回#

eland.Dataframe

引用 destination_index 中数据的 eland.DataFrame

示例#

>>> pd_df = pd.DataFrame(data={'A': 3.141,
...                            'B': 1,
...                            'C': 'foo',
...                            'D': pd.Timestamp('20190102'),
...                            'E': [1.0, 2.0, 3.0],
...                            'F': False,
...                            'G': [1, 2, 3],
...                            'H': 'Long text - to be indexed as es type text'},
...                      index=['0', '1', '2'])
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
       A  B  ...  G                                          H
0  3.141  1  ...  1  Long text - to be indexed as es type text
1  3.141  1  ...  2  Long text - to be indexed as es type text
2  3.141  1  ...  3  Long text - to be indexed as es type text

[3 rows x 8 columns]
>>> pd_df.dtypes
A           float64
B             int64
C            object
D    datetime64[ns]
E           float64
F              bool
G             int64
H            object
dtype: object

pandas.DataFrame 转换为 eland.DataFrame - 这将创建一个名为 pandas_to_eland 的 Elasticsearch 索引。如果存在则覆盖现有的 Elasticsearch 索引 if_exists=”replace”,并在返回时同步索引以使其可读 refresh=True

>>> ed_df = ed.pandas_to_eland(pd_df,
...                            'http://localhost:9200',
...                            'pandas_to_eland',
...                            es_if_exists="replace",
...                            es_refresh=True,
...                            es_type_overrides={'H':'text'}) # index field 'H' as text not keyword
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
       A  B  ...  G                                          H
0  3.141  1  ...  1  Long text - to be indexed as es type text
1  3.141  1  ...  2  Long text - to be indexed as es type text
2  3.141  1  ...  3  Long text - to be indexed as es type text

[3 rows x 8 columns]
>>> ed_df.dtypes
A           float64
B             int64
C            object
D    datetime64[ns]
E           float64
F              bool
G             int64
H            object
dtype: object

另请参见#

eland.eland_to_pandas: 从 eland.DataFrame 创建 pandas.Dataframe