Skip to content

Commit

Permalink
Patch arrow for Windows (ray-project#6363)
Browse files Browse the repository at this point in the history
  • Loading branch information
mehrdadn authored and pcmoritz committed Dec 6, 2019
1 parent 6223d2e commit 1710337
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 8 deletions.
31 changes: 23 additions & 8 deletions bazel/BUILD.plasma
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ genrule(
output_to_bindir = 1,
)

cc_library(
name = "ae",
srcs = [
"cpp/src/plasma/thirdparty/ae/ae.c",
],
hdrs = [
"cpp/src/plasma/thirdparty/ae/ae.h",
"cpp/src/plasma/thirdparty/ae/ae_epoll.c",
"cpp/src/plasma/thirdparty/ae/ae_evport.c",
"cpp/src/plasma/thirdparty/ae/ae_kqueue.c",
"cpp/src/plasma/thirdparty/ae/ae_select.c",
"cpp/src/plasma/thirdparty/ae/config.h",
"cpp/src/plasma/thirdparty/ae/zmalloc.h",
],
copts = COPTS,
includes = [
"cpp/src/plasma/thirdparty/ae",
],
strip_include_prefix = "cpp/src",
visibility = ["//visibility:public"],
)

cc_library(
name = "plasma_lib",
srcs = [
Expand All @@ -171,7 +193,6 @@ cc_library(
"cpp/src/plasma/external_store.cc",
"cpp/src/plasma/plasma_allocator.cc",
"cpp/src/plasma/quota_aware_policy.cc",
"cpp/src/plasma/thirdparty/ae/ae.c",
],
hdrs = [
"cpp/src/plasma/events.h",
Expand All @@ -180,19 +201,13 @@ cc_library(
"cpp/src/plasma/plasma_allocator.h",
"cpp/src/plasma/quota_aware_policy.h",
"cpp/src/plasma/store.h",
"cpp/src/plasma/thirdparty/ae/ae.h",
"cpp/src/plasma/thirdparty/ae/ae_epoll.c",
"cpp/src/plasma/thirdparty/ae/ae_evport.c",
"cpp/src/plasma/thirdparty/ae/ae_kqueue.c",
"cpp/src/plasma/thirdparty/ae/ae_select.c",
"cpp/src/plasma/thirdparty/ae/config.h",
"cpp/src/plasma/thirdparty/ae/zmalloc.h",
"cpp/src/plasma/thirdparty/dlmalloc.c",
],
copts = COPTS,
linkopts = LINKOPTS,
strip_include_prefix = "cpp/src",
deps = [
":ae",
":plasma_client",
"@com_github_google_glog//:glog",
],
Expand Down
7 changes: 7 additions & 0 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ def ray_deps_setup():
commit = "86f34aa07e611787d9cc98c6a33b0a0a536dce57",
remote = "https://github.com/apache/arrow",
sha256 = "4f1956e74188fa15078c8ad560bbc298624320d2aafd21fe7a2511afee7ea841",
patches = [
"//thirdparty/patches:arrow-headers-unused.patch",
"//thirdparty/patches:arrow-windows-export.patch",
"//thirdparty/patches:arrow-windows-poll.patch",
"//thirdparty/patches:arrow-windows-sigpipe.patch",
"//thirdparty/patches:arrow-windows-socket.patch",
],
)

github_repository(
Expand Down
28 changes: 28 additions & 0 deletions thirdparty/patches/arrow-headers-unused.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc
index 5142ee435..5266e3e66 100644
--- cpp/src/plasma/client.cc
+++ cpp/src/plasma/client.cc
@@ -19,10 +19,6 @@

#include "plasma/client.h"

-#ifdef _WIN32
-#include <Win32_Interop/win32_types.h>
-#endif
-
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
diff --git cpp/src/plasma/plasma.h cpp/src/plasma/plasma.h
index 79e33c2f0..c8241b2fe 100644
--- cpp/src/plasma/plasma.h
+++ cpp/src/plasma/plasma.h
@@ -25,7 +25,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <unistd.h> // pid_t

#include <memory>
#include <string>
--
15 changes: 15 additions & 0 deletions thirdparty/patches/arrow-windows-export.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
diff --git cpp/src/arrow/util/logging.cc cpp/src/arrow/util/logging.cc
index e54a10e52..3dbfc7a12 100644
--- cpp/src/arrow/util/logging.cc
+++ cpp/src/arrow/util/logging.cc
@@ -84,7 +84,9 @@ typedef google::LogMessage LoggingProvider;
typedef CerrLog LoggingProvider;
#endif

+#if !defined(_WIN32) || defined(ARROW_STATIC) || defined(ARROW_EXPORTING) || !defined(ARROW_EXPORT)
ArrowLogLevel ArrowLog::severity_threshold_ = ArrowLogLevel::ARROW_INFO;
+#endif
// Keep the log directory.
static std::unique_ptr<std::string> log_dir_;

--
49 changes: 49 additions & 0 deletions thirdparty/patches/arrow-windows-poll.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
diff --git cpp/src/plasma/thirdparty/ae/ae.c cpp/src/plasma/thirdparty/ae/ae.c
index dfb722444..96d9e537a 100644
--- cpp/src/plasma/thirdparty/ae/ae.c
+++ cpp/src/plasma/thirdparty/ae/ae.c
@@ -428,19 +428,33 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
/* Wait for milliseconds until the given file descriptor becomes
* writable/readable/exception */
int aeWait(int fd, int mask, long long milliseconds) {
- struct pollfd pfd;
+ short revents = 0;
+ struct timeval tv = { milliseconds / 1000, (milliseconds % 1000) * 1000 };
int retmask = 0, retval;

- memset(&pfd, 0, sizeof(pfd));
- pfd.fd = fd;
- if (mask & AE_READABLE) pfd.events |= POLLIN;
- if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
+ fd_set rset, wset;
+ FD_ZERO(&rset);
+ FD_ZERO(&wset);
+ if (mask & AE_READABLE) {
+ FD_SET(fd, &rset);
+ } else if (mask & AE_WRITABLE) {
+ FD_SET(fd, &wset);
+ }
+
+ if ((retval = select(fd + 1, &rset, &wset, NULL, &tv)) > 0) {
+ if (FD_ISSET(fd, &rset)) {
+ revents |= POLLIN;
+ }
+ if (FD_ISSET(fd, &wset)) {
+ revents |= POLLOUT;
+ }
+ }

- if ((retval = poll(&pfd, 1, milliseconds))== 1) {
- if (pfd.revents & POLLIN) retmask |= AE_READABLE;
- if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
- if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
- if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
+ if (retval== 1) {
+ if (revents & POLLIN) retmask |= AE_READABLE;
+ if (revents & POLLOUT) retmask |= AE_WRITABLE;
+ if (revents & POLLERR) retmask |= AE_WRITABLE;
+ if (revents & POLLHUP) retmask |= AE_WRITABLE;
return retmask;
} else {
return retval;
--
17 changes: 17 additions & 0 deletions thirdparty/patches/arrow-windows-sigpipe.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
diff --git cpp/src/plasma/store.cc cpp/src/plasma/store.cc
index 01aabfc7c..876cb59a6 100644
--- cpp/src/plasma/store.cc
+++ cpp/src/plasma/store.cc
@@ -1182,9 +1182,11 @@ void HandleSignal(int signal) {

void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled,
std::shared_ptr<ExternalStore> external_store) {
+#ifndef _WIN32 // TODO(mehrdadn): Is there an equivalent of this we need for Windows?
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
+#endif

g_runner.reset(new PlasmaStoreRunner());
signal(SIGTERM, HandleSignal);
--
34 changes: 34 additions & 0 deletions thirdparty/patches/arrow-windows-socket.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc
index 0cb1d81a2..5142ee435 100644
--- cpp/src/plasma/client.cc
+++ cpp/src/plasma/client.cc
@@ -983,8 +983,13 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
// notifications from the Plasma store to the client.
socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
// Make the socket non-blocking.
+#ifdef _WINSOCKAPI_
+ unsigned long value = 1;
+ ARROW_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0);
+#else
int flags = fcntl(sock[1], F_GETFL, 0);
ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
+#endif
// Tell the Plasma store about the subscription.
RETURN_NOT_OK(SendSubscribeRequest(store_conn_));
// Send the file descriptor that the Plasma store should use to push
diff --git cpp/src/plasma/fling.cc cpp/src/plasma/fling.cc
index f0960aab6..2f3997534 100644
--- cpp/src/plasma/fling.cc
+++ cpp/src/plasma/fling.cc
@@ -18,6 +18,10 @@

#include "arrow/util/logging.h"

+#ifdef _WIN32
+#include <ws2tcpip.h> // socklen_t
+#endif
+
void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
iov->iov_base = buf;
iov->iov_len = 1;
--

0 comments on commit 1710337

Please sign in to comment.