Skip to content

Commit

Permalink
Fix ElasticSearch SQLClient deprecation warning (apache#41871)
Browse files Browse the repository at this point in the history
* Remove importing SQLClient, and use sql interface directly from elasticsearch client

* Fix failing CI test

* Remove checking pytest in modules, and patch ElasticSearch directly instead of SQLClient
  • Loading branch information
Owen-CH-Leung authored Sep 1, 2024
1 parent 3b76ec9 commit b1e4f37
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
4 changes: 1 addition & 3 deletions airflow/providers/elasticsearch/hooks/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from deprecated import deprecated
from elasticsearch import Elasticsearch
from elasticsearch.client import SqlClient

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
Expand Down Expand Up @@ -70,11 +69,10 @@ def __init__(
self.es = Elasticsearch(self.url, http_auth=(user, password), **self.kwargs)
else:
self.es = Elasticsearch(self.url, **self.kwargs)
self.es_sql_client = SqlClient(self.es)

def execute_sql(self, query: str) -> ObjectApiResponse:
sql_query = {"query": query}
return self.es_sql_client.query(body=sql_query)
return self.es.sql.query(body=sql_query)


class ElasticsearchSQLHook(DbApiHook):
Expand Down
11 changes: 7 additions & 4 deletions tests/providers/elasticsearch/hooks/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from unittest import mock
from unittest.mock import MagicMock

import pytest
from elasticsearch import Elasticsearch
Expand Down Expand Up @@ -119,16 +120,18 @@ def test_get_pandas_df(self):

self.cur.execute.assert_called_once_with(statement)

@mock.patch("airflow.providers.elasticsearch.hooks.elasticsearch.SqlClient.query")
def test_execute_sql_query(self, mock_query):
mock_query.return_value = {
@mock.patch("airflow.providers.elasticsearch.hooks.elasticsearch.Elasticsearch")
def test_execute_sql_query(self, mock_es):
mock_es_sql_client = MagicMock()
mock_es_sql_client.query.return_value = {
"columns": [{"name": "id"}, {"name": "first_name"}],
"rows": [[1, "John"], [2, "Jane"]],
}
mock_es.return_value.sql = mock_es_sql_client

es_connection = ESConnection(host="localhost", port=9200)
response = es_connection.execute_sql("SELECT * FROM index1")
mock_query.assert_called_once_with(body={"query": "SELECT * FROM index1"})
mock_es_sql_client.query.assert_called_once_with(body={"query": "SELECT * FROM index1"})

assert response["rows"] == [[1, "John"], [2, "Jane"]]
assert response["columns"] == [{"name": "id"}, {"name": "first_name"}]
Expand Down

0 comments on commit b1e4f37

Please sign in to comment.