Skip to content

Commit

Permalink
Workaround for ZK 3.4->3.5 upgrade (apache#1955)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 13, 2018
1 parent 311817f commit e45d9e3
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 10 deletions.
4 changes: 2 additions & 2 deletions all/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt

Eclipse Public License 1.0 -- licenses/LICENSE-AspectJ.txt
* AspectJ
- org.aspectj-aspectjrt-1.8.9.jar
- org.aspectj-aspectjweaver-1.8.9.jar
- org.aspectj-aspectjrt-1.9.1.jar
- org.aspectj-aspectjweaver-1.9.1.jar

Public Domain (CC0) -- licenses/LICENSE-CC0.txt
* Reactive Streams -- org.reactivestreams-reactive-streams-1.0.0.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ flexible messaging model and an intuitive client API.</description>
<jersey.version>2.25</jersey.version>
<athenz.version>1.7.17</athenz.version>
<prometheus.version>0.0.23</prometheus.version>
<aspectj.version>1.8.9</aspectj.version>
<aspectj.version>1.9.1</aspectj.version>
<rocksdb.version>5.13.1</rocksdb.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j2.version>2.10.0</log4j2.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ void start() throws Exception {

log.debug("--- setup PulsarStandaloneStarter ---");

// load aspectj-weaver agent for instrumentation
AgentLoader.loadAgentClass(Agent.class.getName(), null);

if (!onlyBroker) {
// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, zkDir, bkDir, wipeData, config.getAdvertisedAddress());
Expand All @@ -175,9 +178,6 @@ void start() throws Exception {
return;
}

// load aspectj-weaver agent for instrumentation
AgentLoader.loadAgentClass(Agent.class.getName(), null);

// initialize the functions worker
if (!noFunctionsWorker) {
WorkerConfig workerConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.zookeeper;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;

@Slf4j
public class FileTxnSnapLogWrapper extends FileTxnSnapLog {

public FileTxnSnapLogWrapper(FileTxnSnapLog src) throws IOException {
this(src.getDataDir(), src.getSnapDir());
}

public FileTxnSnapLogWrapper(File dataDir, File snapDir) throws IOException {
super(dataDir, snapDir);
}

@Override
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
try {
return super.restore(dt, sessions, listener);
} catch (IOException e) {
if ("No snapshot found, but there are log entries. Something is broken!".equals(e.getMessage())) {
log.info("Ignoring exception for missing ZK db");
// Ignore error when snapshot is not found. This is needed when upgrading ZK from 3.4 to 3.5
// https://issues.apache.org/jira/browse/ZOOKEEPER-3056
save(dt, (ConcurrentHashMap<Long, Integer>) sessions);

/* return a zxid of zero, since we the database is empty */
return 0;
} else {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,18 @@ private void runZookeeper(int maxCC) throws IOException {
try {
// Allow all commands on ZK control port
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
zks = new ZooKeeperServer(new FileTxnSnapLogWrapper(zkDataDir, zkDataDir));

serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(ZooKeeperDefaultPort), maxCC);
serverFactory.startup(zks);
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);

if (serverFactory != null) {
serverFactory.shutdown();
}
throw new IOException(e);
}

boolean b = waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
Expand Down
7 changes: 6 additions & 1 deletion pulsar-zookeeper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-zookeeper-utils</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
*/
package org.apache.pulsar.zookeeper;

import io.prometheus.client.Gauge;

import java.util.Arrays;

import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;

import io.prometheus.client.Gauge;

/**
* Instruments ZooKeeperServer to enable stats reporting on data set and z-node sizess
*/
Expand All @@ -37,6 +42,17 @@ public class ZooKeeperServerAspect {
public void zkServerConstructorPointCut() {
}

@Around("zkServerConstructorPointCut()")
public void zkServerConstructorBefore(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
if (args[0] instanceof FileTxnSnapLog) {
// Wrap FileTxnSnapLog argument
args[0] = new FileTxnSnapLogWrapper((FileTxnSnapLog)args[0]);
}

joinPoint.proceed(args);
}

@After("zkServerConstructorPointCut()")
public void zkServerConstructor(JoinPoint joinPoint) throws Throwable {
// ZooKeeperServer instance was created
Expand Down

0 comments on commit e45d9e3

Please sign in to comment.