Skip to content

Commit

Permalink
[Improvement]: Change socpe of amoro:spark to runtime in `ams/serve…
Browse files Browse the repository at this point in the history
…r` dependency. (#2063)

change compile dependency to runtime dependency in ams/server of spark module
  • Loading branch information
baiyangtx authored Oct 9, 2023
1 parent f032423 commit b349310
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
1 change: 1 addition & 0 deletions ams/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
<groupId>com.netease.amoro</groupId>
<artifactId>amoro-spark-${terminal.spark.major.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,32 @@

import com.netease.arctic.server.catalog.CatalogType;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.spark.ArcticSparkCatalog;
import com.netease.arctic.spark.ArcticSparkExtensions;
import com.netease.arctic.spark.ArcticSparkSessionCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;

import java.util.List;
import java.util.Map;

public class SparkContextUtil {

public static final String ICEBERG_EXTENSION = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String MIXED_FORMAT_EXTENSION = "com.netease.arctic.spark.ArcticSparkExtensions";
public static final String ICEBERG_CATALOG = "org.apache.iceberg.spark.SparkCatalog";
public static final String MIXED_FORMAT_CATALOG = "com.netease.arctic.spark.ArcticSparkCatalog";
public static final String MIXED_FORMAT_SESSION_CATALOG = "com.netease.arctic.spark.ArcticSparkSessionCatalog";
public static final String MIXED_FORMAT_PROPERTY_REFRESH_BEFORE_USAGE =
"spark.sql.arctic.refresh-catalog-before-usage";

public static Map<String, String> getSparkConf(Configurations sessionConfig) {
Map<String, String> sparkConf = Maps.newLinkedHashMap();
sparkConf.put("spark.sql.extensions", ArcticSparkExtensions.class.getName() +
"," + IcebergSparkSessionExtensions.class.getName());
sparkConf.put("spark.sql.extensions", MIXED_FORMAT_EXTENSION + "," + ICEBERG_EXTENSION);

List<String> catalogs = sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.CATALOGS);
String catalogUrlBase = sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.CATALOG_URL_BASE);

for (String catalog : catalogs) {
String connector = sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.catalogConnector(catalog));
if ("iceberg".equalsIgnoreCase(connector)) {
sparkConf.put("spark.sql.catalog." + catalog, SparkCatalog.class.getName());
sparkConf.put("spark.sql.catalog." + catalog, ICEBERG_CATALOG);
Map<String, String> properties =
TerminalSessionFactory.SessionConfigOptions.getCatalogProperties(sessionConfig, catalog);
for (String key : properties.keySet()) {
Expand All @@ -52,14 +54,14 @@ public static Map<String, String> getSparkConf(Configurations sessionConfig) {
}
} else {
String sparkCatalogPrefix = "spark.sql.catalog." + catalog;
String catalogClassName = ArcticSparkCatalog.class.getName();
String catalogClassName = MIXED_FORMAT_CATALOG;
String type =
sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.catalogProperty(catalog, "type"));
if (sessionConfig.getBoolean(
TerminalSessionFactory.SessionConfigOptions.USING_SESSION_CATALOG_FOR_HIVE) &&
CatalogType.HIVE.name().equalsIgnoreCase(type)) {
sparkCatalogPrefix = "spark.sql.catalog.spark_catalog";
catalogClassName = ArcticSparkSessionCatalog.class.getName();
catalogClassName = MIXED_FORMAT_SESSION_CATALOG;
}
sparkConf.put(sparkCatalogPrefix, catalogClassName);
sparkConf.put(sparkCatalogPrefix + ".url", catalogUrlBase + "/" + catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import com.netease.arctic.server.terminal.TerminalSessionFactory;
import com.netease.arctic.server.utils.ConfigOptions;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.spark.ArcticSparkExtensions;
import com.netease.arctic.table.TableMetaStore;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
Expand Down Expand Up @@ -62,7 +60,7 @@ public TerminalSession create(TableMetaStore metaStore, Configurations configura
initializeLogs.add("setup session, session factory: " + LocalSessionFactory.class.getName());

Map<String, String> sparkConf = SparkContextUtil.getSparkConf(configuration);
sparkConf.put(com.netease.arctic.spark.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE, "true");
sparkConf.put(SparkContextUtil.MIXED_FORMAT_PROPERTY_REFRESH_BEFORE_USAGE, "true");

Map<String, String> finallyConf = configuration.toMap();
catalogs.stream()
Expand Down Expand Up @@ -106,8 +104,8 @@ protected synchronized SparkSession lazyInitContext() {
sparkconf.set(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic");
sparkconf.set("spark.executor.heartbeatInterval", "100s");
sparkconf.set("spark.network.timeout", "200s");
sparkconf.set("spark.sql.extensions", ArcticSparkExtensions.class.getName() +
"," + IcebergSparkSessionExtensions.class.getName());
sparkconf.set("spark.sql.extensions", SparkContextUtil.MIXED_FORMAT_EXTENSION +
"," + SparkContextUtil.ICEBERG_EXTENSION);

for (String key : this.conf.keySet()) {
if (key.startsWith(SPARK_CONF_PREFIX)) {
Expand Down

0 comments on commit b349310

Please sign in to comment.