From b1e4f375058102b6dc16a4bac7f166dbf4b7713b Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Sun, 1 Sep 2024 09:15:44 +0800 Subject: [PATCH] Fix ElasticSearch SQLClient deprecation warning (#41871) * Remove importing SQLClient, and use sql interface directly from elasticsearch client * Fix failing CI test * Remove checking pytest in modules, and patch ElasticSearch directly instead of SQLClient --- .../providers/elasticsearch/hooks/elasticsearch.py | 4 +--- .../elasticsearch/hooks/test_elasticsearch.py | 11 +++++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py index 11039651c0edc..ca90400177791 100644 --- a/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -23,7 +23,6 @@ from deprecated import deprecated from elasticsearch import Elasticsearch -from elasticsearch.client import SqlClient from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.hooks.base import BaseHook @@ -70,11 +69,10 @@ def __init__( self.es = Elasticsearch(self.url, http_auth=(user, password), **self.kwargs) else: self.es = Elasticsearch(self.url, **self.kwargs) - self.es_sql_client = SqlClient(self.es) def execute_sql(self, query: str) -> ObjectApiResponse: sql_query = {"query": query} - return self.es_sql_client.query(body=sql_query) + return self.es.sql.query(body=sql_query) class ElasticsearchSQLHook(DbApiHook): diff --git a/tests/providers/elasticsearch/hooks/test_elasticsearch.py b/tests/providers/elasticsearch/hooks/test_elasticsearch.py index 91965a785defb..9e7dbe2de8165 100644 --- a/tests/providers/elasticsearch/hooks/test_elasticsearch.py +++ b/tests/providers/elasticsearch/hooks/test_elasticsearch.py @@ -18,6 +18,7 @@ from __future__ import annotations from unittest import mock +from unittest.mock import MagicMock import pytest from elasticsearch import Elasticsearch @@ -119,16 +120,18 @@ def test_get_pandas_df(self): self.cur.execute.assert_called_once_with(statement) - @mock.patch("airflow.providers.elasticsearch.hooks.elasticsearch.SqlClient.query") - def test_execute_sql_query(self, mock_query): - mock_query.return_value = { + @mock.patch("airflow.providers.elasticsearch.hooks.elasticsearch.Elasticsearch") + def test_execute_sql_query(self, mock_es): + mock_es_sql_client = MagicMock() + mock_es_sql_client.query.return_value = { "columns": [{"name": "id"}, {"name": "first_name"}], "rows": [[1, "John"], [2, "Jane"]], } + mock_es.return_value.sql = mock_es_sql_client es_connection = ESConnection(host="localhost", port=9200) response = es_connection.execute_sql("SELECT * FROM index1") - mock_query.assert_called_once_with(body={"query": "SELECT * FROM index1"}) + mock_es_sql_client.query.assert_called_once_with(body={"query": "SELECT * FROM index1"}) assert response["rows"] == [[1, "John"], [2, "Jane"]] assert response["columns"] == [{"name": "id"}, {"name": "first_name"}]