Skip to content

Commit

Permalink
[hotfix][coordination] Wire ZooKeeperJobGraphStore to the new interface
Browse files Browse the repository at this point in the history
We introduce three specific composition implementations for ZooKeeper, ZooKeeperJobGraphStoreWatcher, ZooKeeperJobGraphStoreEventHandler, ZooKeeperJobGraphStoreUtil.
* ZooKeeperJobGraphStoreWatcher is a watcher on JobGraphStore. It could monitor all the changes on the job graph store and notify the DefaultJobGraphStore via JobGraphStore.JobGraphListener.
* ZooKeeperJobGraphStoreUtil is a utility class which could convert a ZooKeeper path to JobId, or vice versa.
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Nov 7, 2020
1 parent 90df930 commit c3a6b51
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 605 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.util.ZooKeeperUtils;

/**
* Singleton {@link JobGraphStoreUtil} implementation for ZooKeeper.
*/
public enum ZooKeeperJobGraphStoreUtil implements JobGraphStoreUtil {
INSTANCE;

@Override
public String jobIDToName(JobID jobId) {
return ZooKeeperUtils.getPathForJob(jobId);
}

@Override
public JobID nameToJobID(String name) {
return JobID.fromHexString(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.api.common.JobID;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@link JobGraphStoreWatcher} implementation for ZooKeeper.
*
* <p>Each job graph creates ZNode:
* <pre>
* +----O /flink/jobgraphs/&lt;job-id&gt; 1 [persistent]
* .
* .
* .
* +----O /flink/jobgraphs/&lt;job-id&gt; N [persistent]
* </pre>
*
* <p>The root path is watched to detect concurrent modifications in corner situations where
* multiple instances operate concurrently. The job manager acts as a {@link JobGraphStore.JobGraphListener}
* to react to such situations.
*/
public class ZooKeeperJobGraphStoreWatcher implements JobGraphStoreWatcher {

private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperJobGraphStoreWatcher.class);

/**
* Cache to monitor all children. This is used to detect races with other instances working
* on the same state.
*/
private final PathChildrenCache pathCache;

private JobGraphStore.JobGraphListener jobGraphListener;

private volatile boolean running;

public ZooKeeperJobGraphStoreWatcher(PathChildrenCache pathCache) {
this.pathCache = checkNotNull(pathCache);
this.pathCache.getListenable().addListener(new JobGraphsPathCacheListener());
running = false;
}

@Override
public void start(JobGraphStore.JobGraphListener jobGraphListener) throws Exception {
this.jobGraphListener = checkNotNull(jobGraphListener);
running = true;
pathCache.start();
}

@Override
public void stop() throws Exception {
if (!running) {
return;
}
running = false;

LOG.info("Stopping ZooKeeperJobGraphStoreWatcher ");
pathCache.close();
}

/**
* Monitors ZooKeeper for changes.
*
* <p>Detects modifications from other job managers in corner situations. The event
* notifications fire for changes from this job manager as well.
*/
private final class JobGraphsPathCacheListener implements PathChildrenCacheListener {

@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {

if (LOG.isDebugEnabled()) {
if (event.getData() != null) {
LOG.debug("Received {} event (path: {})", event.getType(), event.getData().getPath());
}
else {
LOG.debug("Received {} event", event.getType());
}
}

switch (event.getType()) {
case CHILD_ADDED: {
JobID jobId = fromEvent(event);

LOG.debug("Received CHILD_ADDED event notification for job {}", jobId);

jobGraphListener.onAddedJobGraph(jobId);
}
break;

case CHILD_UPDATED: {
// Nothing to do
}
break;

case CHILD_REMOVED: {
JobID jobId = fromEvent(event);

LOG.debug("Received CHILD_REMOVED event notification for job {}", jobId);

jobGraphListener.onRemovedJobGraph(jobId);
}
break;

case CONNECTION_SUSPENDED: {
LOG.warn("ZooKeeper connection SUSPENDING. Changes to the submitted job " +
"graphs are not monitored (temporarily).");
}
break;

case CONNECTION_LOST: {
LOG.warn("ZooKeeper connection LOST. Changes to the submitted job " +
"graphs are not monitored (permanently).");
}
break;

case CONNECTION_RECONNECTED: {
LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job " +
"graphs are monitored again.");
}
break;

case INITIALIZED: {
LOG.info("JobGraphsPathCacheListener initialized");
}
break;
}
}

/**
* Returns a JobID for the event's path.
*/
private JobID fromEvent(PathChildrenCacheEvent event) {
return JobID.fromHexString(ZKPaths.getNodeFromPath(event.getData().getPath()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreWatcher;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
Expand Down Expand Up @@ -294,14 +297,15 @@ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFac
}

/**
* Creates a {@link ZooKeeperJobGraphStore} instance.
* Creates a {@link DefaultJobGraphStore} instance with {@link ZooKeeperStateHandleStore},
* {@link ZooKeeperJobGraphStoreWatcher} and {@link ZooKeeperJobGraphStoreUtil}.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
* @return {@link ZooKeeperJobGraphStore} instance
* @return {@link DefaultJobGraphStore} instance
* @throws Exception if the submitted job graph store cannot be created
*/
public static ZooKeeperJobGraphStore createJobGraphs(
public static JobGraphStore createJobGraphs(
CuratorFramework client,
Configuration configuration) throws Exception {

Expand All @@ -326,10 +330,10 @@ public static ZooKeeperJobGraphStore createJobGraphs(

final PathChildrenCache pathCache = new PathChildrenCache(facade, "/", false);

return new ZooKeeperJobGraphStore(
zooKeeperFullJobsPath,
return new DefaultJobGraphStore<>(
zooKeeperStateHandleStore,
pathCache);
new ZooKeeperJobGraphStoreWatcher(pathCache),
ZooKeeperJobGraphStoreUtil.INSTANCE);
}

/**
Expand Down Expand Up @@ -359,7 +363,7 @@ public static CompletedCheckpointStore createCompletedCheckpoints(
configuration,
HA_STORAGE_COMPLETED_CHECKPOINT);

checkpointsPath += ZooKeeperJobGraphStore.getPathForJob(jobId);
checkpointsPath += getPathForJob(jobId);

final ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
maxNumberOfCheckpointsToRetain,
Expand All @@ -370,6 +374,15 @@ public static CompletedCheckpointStore createCompletedCheckpoints(
return zooKeeperCompletedCheckpointStore;
}


/**
* Returns the JobID as a String (with leading slash).
*/
public static String getPathForJob(JobID jobId) {
checkNotNull(jobId, "Job ID");
return String.format("/%s", jobId);
}

/**
* Creates an instance of {@link ZooKeeperStateHandleStore}.
*
Expand Down Expand Up @@ -404,7 +417,7 @@ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
String checkpointIdCounterPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);

checkpointIdCounterPath += ZooKeeperJobGraphStore.getPathForJob(jobId);
checkpointIdCounterPath += getPathForJob(jobId);

return new ZooKeeperCheckpointIDCounter(client, checkpointIdCounterPath, new DefaultLastStateConnectionStateListener());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -159,7 +158,8 @@ public RetrievableStateHandle<T> addAndLock(
return storeHandle;
}
catch (KeeperException.NodeExistsException e) {
throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
// We wrap the exception here so that it could be caught in DefaultJobGraphStore
throw new AlreadyExistException("ZooKeeper node " + path + " already exists.", e);
}
finally {
if (!success) {
Expand Down Expand Up @@ -202,7 +202,8 @@ public void replace(String pathInZooKeeper, IntegerResourceVersion expectedVersi
.forPath(path, serializedStateHandle);
success = true;
} catch (KeeperException.NoNodeException e) {
throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", e);
// We wrap the exception here so that it could be caught in DefaultJobGraphStore
throw new NotExistException("ZooKeeper node " + path + " does not exist.", e);
} finally {
if (success) {
oldStateHandle.discardState();
Expand Down Expand Up @@ -492,6 +493,10 @@ private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean lock) thro
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
} catch (KeeperException.NodeExistsException ignored) {
// we have already created the lock
} catch (KeeperException.NoNodeException ex) {
// We could run into this exception because the parent node does not exist when we are trying to lock.
// We wrap the exception here so that it could be caught in DefaultJobGraphStore
throw new NotExistException("ZooKeeper node " + path + " does not exist.", ex);
}
}

Expand All @@ -500,18 +505,19 @@ private RetrievableStateHandle<T> get(String pathInZooKeeper, boolean lock) thro
try {
byte[] data = client.getData().forPath(path);

try {
RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
data,
Thread.currentThread().getContextClassLoader());
RetrievableStateHandle<T> retrievableStateHandle = InstantiationUtil.deserializeObject(
data,
Thread.currentThread().getContextClassLoader());

success = true;
success = true;

return retrievableStateHandle;
} catch (IOException | ClassNotFoundException e) {
throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
path + '.', e);
}
return retrievableStateHandle;
} catch (KeeperException.NoNodeException ex) {
// We wrap the exception here so that it could be caught in DefaultJobGraphStore
throw new NotExistException("ZooKeeper node " + path + " does not exist.", ex);
} catch (IOException | ClassNotFoundException e) {
throw new IOException("Failed to deserialize state handle from ZooKeeper data from " +
path + '.', e);
} finally {
if (!success && lock) {
// release the lock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
Expand Down Expand Up @@ -192,7 +192,7 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception {
dispatcherLeaderElectionService.notLeader();

// check that the job has been removed from ZooKeeper
final ZooKeeperJobGraphStore submittedJobGraphStore = createZooKeeperJobGraphStore(client);
final JobGraphStore submittedJobGraphStore = createZooKeeperJobGraphStore(client);

CommonTestUtils.waitUntilCondition(() -> submittedJobGraphStore.getJobIds().isEmpty(), Deadline.fromNow(VERIFICATION_TIMEOUT), 20L);
}
Expand All @@ -217,7 +217,7 @@ private DispatcherRunner createDispatcherRunner(
partialDispatcherServices);
}

private ZooKeeperJobGraphStore createZooKeeperJobGraphStore(CuratorFramework client) {
private JobGraphStore createZooKeeperJobGraphStore(CuratorFramework client) {
try {
return ZooKeeperUtils.createJobGraphs(client, configuration);
} catch (Exception e) {
Expand Down
Loading

0 comments on commit c3a6b51

Please sign in to comment.