Skip to content

Commit

Permalink
[FLINK-21506][mesos] Option for mesos task user
Browse files Browse the repository at this point in the history
  • Loading branch information
qinghui-xu authored Mar 23, 2021
1 parent 465abe7 commit d2bbc88
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,11 @@
<td>String</td>
<td>A comma separated list of URIs of custom artifacts to be downloaded into the sandbox of Mesos workers.</td>
</tr>
<tr>
<td><h5>mesos.resourcemanager.tasks.user</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Unix user which mesos tasks should run as.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation al
cmd.addUris(CommandInfo.URI.newBuilder().setValue(uri));
}

// set unix user for mesos tasks
if (params.user().isDefined()) {
cmd.setUser(params.user().get());
}

// propagate environment variables
for (Map.Entry<String, String> entry :
params.containeredParameters().taskManagerEnv().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public class MesosTaskManagerParameters {
"A comma separated list of URIs of custom artifacts to be downloaded into the sandbox"
+ " of Mesos workers.");

public static final ConfigOption<String> MESOS_TM_USER =
key("mesos.resourcemanager.tasks.user")
.stringType()
.noDefaultValue()
.withDescription("Unix user which mesos tasks should run as.");

public static final ConfigOption<Map<String, String>> MESOS_TM_LABELS =
key("mesos.resourcemanager.tasks.labels")
.mapType()
Expand Down Expand Up @@ -214,6 +220,8 @@ public class MesosTaskManagerParameters {

private final List<String> uris;

private final Option<String> user;

private final Map<String, String> mesosLabels;

public MesosTaskManagerParameters(
Expand All @@ -231,6 +239,7 @@ public MesosTaskManagerParameters(
Option<String> bootstrapCommand,
Option<String> taskManagerHostname,
List<String> uris,
Option<String> user,
Map<String, String> mesosLabels) {

this.gpus = gpus;
Expand All @@ -247,6 +256,7 @@ public MesosTaskManagerParameters(
this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
this.uris = Preconditions.checkNotNull(uris);
this.user = Preconditions.checkNotNull(user);
this.mesosLabels = Preconditions.checkNotNull(mesosLabels);
}

Expand Down Expand Up @@ -333,6 +343,11 @@ public List<String> uris() {
return uris;
}

/** Get the unix user as which mesos tasks run. */
public Option<String> user() {
return user;
}

/** Get mesos task labels. */
public Map<String, String> mesosLabels() {
return mesosLabels;
Expand Down Expand Up @@ -442,6 +457,9 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) {
Option<String> tmBootstrapCommand =
Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));

// obtain unix user that's running mesos tasks
Option<String> user = Option.apply(flinkConfig.getString(MESOS_TM_USER));

// obtain mesos task labels from configuration
Map<String, String> mesosLabels =
flinkConfig.getOptional(MESOS_TM_LABELS).orElse(Collections.emptyMap());
Expand All @@ -461,6 +479,7 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) {
tmBootstrapCommand,
taskManagerHostname,
uris,
user,
mesosLabels);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ private MesosTaskManagerParameters generateMesosTaskManagerParameters() {
Option.empty(),
Option.empty(),
Collections.emptyList(),
Option.empty(),
Collections.emptyMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,14 @@ public void testMesosLabelsConfiguration() {
assertEquals(expectedLabels, params.mesosLabels());
}

@Test
public void testTaskUserConfiguration() {
Configuration conf = getConfiguration();
conf.setString(MesosTaskManagerParameters.MESOS_TM_USER, "flink");
MesosTaskManagerParameters params = MesosTaskManagerParameters.create(conf);
assertEquals("flink", params.user().get());
}

private static Configuration getConfiguration() {
Configuration config = new Configuration();
config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, TOTAL_PROCESS_MEMORY_SIZE);
Expand Down

0 comments on commit d2bbc88

Please sign in to comment.