Skip to content

Commit 93ff3fd

Browse files
committed
[WIP] Rework configuration (#79)
1 parent cb1a35a commit 93ff3fd

5 files changed

Lines changed: 243 additions & 86 deletions

File tree

server/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ edition = "2018"
77

88
[dependencies]
99
jemallocator = "0.1.8"
10-
# timely = { version = "0.9.0", features = ["bincode"] }
11-
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", features = ["bincode"] }
12-
# differential-dataflow = { version = "0.9.0" }
13-
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" }
10+
timely = { version = "0.10.0", features = ["bincode"] }
11+
differential-dataflow = { version = "0.10.0" }
1412
declarative-dataflow = { path = "../", features = ["serde_json", "uuid"] }
1513
serde = "1"
1614
serde_derive = "1"

server/src/main.rs

Lines changed: 184 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ extern crate serde_derive;
77
extern crate log;
88

99
use std::collections::{HashSet, VecDeque};
10+
use std::fs::File;
11+
use std::io::{BufRead, BufReader, Read};
1012
use std::time::{Duration, Instant};
1113

12-
use getopts::Options;
13-
1414
use timely::dataflow::channels::pact::{Exchange, Pipeline};
1515
use timely::dataflow::operators::generic::OutputHandle;
1616
use timely::dataflow::operators::{Operator, Probe};
@@ -20,7 +20,8 @@ use timely::synchronization::Sequencer;
2020
use differential_dataflow::logging::DifferentialEvent;
2121
use differential_dataflow::operators::Consolidate;
2222

23-
use declarative_dataflow::server::{scheduler, Config, CreateAttribute, Request, Server, TxId};
23+
use declarative_dataflow::server;
24+
use declarative_dataflow::server::{scheduler, CreateAttribute, Request, Server, TxId};
2425
use declarative_dataflow::sinks::{Sinkable, SinkingContext};
2526
use declarative_dataflow::timestamp::{Coarsen, Time};
2627
use declarative_dataflow::{Output, ResultDiff};
@@ -42,6 +43,168 @@ use declarative_dataflow::timestamp::pair::Pair;
4243
#[cfg(feature = "bitemporal")]
4344
type T = Pair<Duration, u64>;
4445

46+
#[derive(Debug, Clone)]
47+
struct Configuration {
48+
/// Port at which client connections should be accepted.
49+
pub port: u16,
50+
/// File from which to read server configuration.
51+
pub config: Option<String>,
52+
/// Number of threads to use.
53+
pub threads: usize,
54+
/// Number of processes to expect over the entire cluster.
55+
pub processes: usize,
56+
/// Host addresses.
57+
pub addresses: Vec<String>,
58+
/// ID of this process within the cluster.
59+
pub timely_pid: usize,
60+
/// Whether to report connection progress.
61+
pub report: bool,
62+
}
63+
64+
impl Default for Configuration {
65+
fn default() -> Self {
66+
Configuration {
67+
port: 6262,
68+
config: None,
69+
threads: 1,
70+
processes: 1,
71+
addresses: vec!["localhost:2101".to_string()],
72+
timely_pid: 0,
73+
report: false,
74+
}
75+
}
76+
}
77+
78+
impl Configuration {
79+
/// Returns a `getopts::Options` struct describing all available
80+
/// configuration options.
81+
pub fn options() -> getopts::Options {
82+
let mut opts = getopts::Options::new();
83+
84+
opts.optopt("", "port", "server port", "PORT");
85+
opts.optopt("", "config", "server configuration file", "FILE");
86+
87+
// Timely arguments.
88+
opts.optopt(
89+
"w",
90+
"threads",
91+
"number of per-process worker threads",
92+
"NUM",
93+
);
94+
opts.optopt("p", "process", "identity of this process", "IDX");
95+
opts.optopt("n", "processes", "number of processes", "NUM");
96+
opts.optopt(
97+
"h",
98+
"hostfile",
99+
"text file whose lines are process addresses",
100+
"FILE",
101+
);
102+
opts.optflag("r", "report", "reports connection progress");
103+
104+
opts
105+
}
106+
107+
/// Parses configuration options from the provided arguments.
108+
pub fn from_args<I: Iterator<Item = String>>(args: I) -> Self {
109+
let default: Self = Default::default();
110+
let opts = Self::options();
111+
112+
let matches = opts.parse(args).expect("failed to parse arguments");
113+
114+
let port = matches
115+
.opt_str("port")
116+
.map(|x| x.parse().expect("failed to parse port"))
117+
.unwrap_or(default.port);
118+
119+
let threads = matches
120+
.opt_str("w")
121+
.map(|x| x.parse().expect("failed to parse threads"))
122+
.unwrap_or(default.threads);
123+
124+
let timely_pid = matches
125+
.opt_str("p")
126+
.map(|x| x.parse().expect("failed to parse process id"))
127+
.unwrap_or(default.timely_pid);
128+
129+
let processes = matches
130+
.opt_str("n")
131+
.map(|x| x.parse().expect("failed to parse processes"))
132+
.unwrap_or(default.processes);
133+
134+
let mut addresses = Vec::new();
135+
if let Some(hosts) = matches.opt_str("h") {
136+
let reader = BufReader::new(File::open(hosts.clone()).unwrap());
137+
for x in reader.lines().take(processes) {
138+
addresses.push(x.unwrap());
139+
}
140+
if addresses.len() < processes {
141+
panic!(
142+
"could only read {} addresses from {}, but -n: {}",
143+
addresses.len(),
144+
hosts,
145+
processes
146+
);
147+
}
148+
} else {
149+
for index in 0..processes {
150+
addresses.push(format!("localhost:{}", 2101 + index));
151+
}
152+
}
153+
154+
assert!(processes == addresses.len());
155+
assert!(timely_pid < processes);
156+
157+
let report = matches.opt_present("report");
158+
159+
Self {
160+
port,
161+
config: matches.opt_str("config"),
162+
threads,
163+
processes,
164+
addresses,
165+
timely_pid,
166+
report,
167+
}
168+
}
169+
}
170+
171+
impl Into<server::Configuration> for Configuration {
172+
fn into(self) -> server::Configuration {
173+
match self.config {
174+
None => server::Configuration::default(),
175+
Some(ref path) => {
176+
let mut config_file =
177+
File::open(path).expect("failed to open server configuration file");
178+
179+
let mut contents = String::new();
180+
config_file
181+
.read_to_string(&mut contents)
182+
.expect("failed to read configuration file");
183+
184+
serde_json::from_str(&contents).expect("failed to parse configuration")
185+
}
186+
}
187+
}
188+
}
189+
190+
impl Into<timely::Configuration> for Configuration {
191+
fn into(self) -> timely::Configuration {
192+
if self.processes > 1 {
193+
timely::Configuration::Cluster {
194+
threads: self.threads,
195+
process: self.timely_pid,
196+
addresses: self.addresses,
197+
report: self.report,
198+
log_fn: Box::new(|_| None),
199+
}
200+
} else if self.threads > 1 {
201+
timely::Configuration::Process(self.threads)
202+
} else {
203+
timely::Configuration::Thread
204+
}
205+
}
206+
}
207+
45208
/// A mutation of server state.
46209
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize, Debug)]
47210
struct Command {
@@ -58,46 +221,15 @@ struct Command {
58221
fn main() {
59222
env_logger::init();
60223

61-
let args: Vec<String> = std::env::args().collect();
62-
let timely_args = std::env::args().take_while(|ref arg| *arg != "--");
63-
let timely_config = timely::Configuration::from_args(timely_args).unwrap();
224+
let config = Configuration::from_args(std::env::args());
225+
let timely_config: timely::Configuration = config.clone().into();
226+
let server_config: server::Configuration = config.clone().into();
64227

65228
timely::execute(timely_config, move |worker| {
66-
// Read configuration.
67-
let config = {
68-
let server_args = args.iter().rev().take_while(|arg| *arg != "--");
69-
let default_config: Config = Default::default();
70-
71-
let opts = options();
72-
73-
match opts.parse(server_args) {
74-
Err(err) => panic!(err),
75-
Ok(matches) => {
76-
let starting_port = matches
77-
.opt_str("port")
78-
.map(|x| x.parse().expect("failed to parse port"))
79-
.unwrap_or(default_config.port);
80-
81-
let tick: Option<Duration> = matches
82-
.opt_str("tick")
83-
.map(|x| Duration::from_secs(x.parse().expect("failed to parse tick duration")));
84-
85-
Config {
86-
port: starting_port + (worker.index() as u16),
87-
tick,
88-
manual_advance: matches.opt_present("manual-advance"),
89-
enable_logging: matches.opt_present("enable-logging"),
90-
enable_optimizer: matches.opt_present("enable-optimizer"),
91-
enable_meta: matches.opt_present("enable-meta"),
92-
}
93-
}
94-
}
95-
};
96-
97229
// Initialize server state (no networking).
98-
let mut server = Server::<T, Token>::new_at(config.clone(), worker.timer());
230+
let mut server = Server::<T, Token>::new_at(server_config.clone(), worker.timer());
99231

100-
if config.enable_logging {
232+
if server_config.enable_logging {
101233
#[cfg(feature = "real-time")]
102234
server.enable_logging(worker).unwrap();
103235
}
@@ -119,7 +251,7 @@ fn main() {
119251

120252
// Kickoff ticking, if configured. We only want to issue ticks
121253
// from a single worker, to avoid redundant ticking.
122-
if worker.index() == 0 && config.tick.is_some() {
254+
if worker.index() == 0 && server_config.tick.is_some() {
123255
sequencer.push(Command {
124256
owner: 0,
125257
client: SYSTEM.0,
@@ -138,12 +270,18 @@ fn main() {
138270
};
139271

140272
info!(
141-
"[WORKER {}] running with config {:?}, {} peers",
273+
"[W{}] running with config {:?}, {} peers",
142274
worker.index(),
143275
config,
144276
worker.peers(),
145277
);
146278

279+
info!(
280+
"[W{}] running with server_config {:?}",
281+
worker.index(),
282+
server_config,
283+
);
284+
147285
// Sequence counter for commands.
148286
let mut next_tx: TxId = 0;
149287

@@ -218,7 +356,7 @@ fn main() {
218356
// Count-up sequence numbers.
219357
next_tx += 1;
220358

221-
info!("[WORKER {}] {} requests by client {} at {}", worker.index(), command.requests.len(), command.client, next_tx);
359+
trace!("[W{}] {} requests by client {} at {}", worker.index(), command.requests.len(), command.client, next_tx);
222360

223361
let owner = command.owner;
224362
let client = command.client;
@@ -228,7 +366,7 @@ fn main() {
228366

229367
// @TODO only create a single dataflow, but only if req != Transact
230368

231-
trace!("[WORKER {}] {:?}", worker.index(), req);
369+
trace!("[W{}] {:?}", worker.index(), req);
232370

233371
let result = match req {
234372
Request::Transact(req) => server.transact(req, owner, worker.index()),
@@ -380,7 +518,7 @@ fn main() {
380518
// We only want to issue ticks from a single worker, to avoid
381519
// redundant ticking.
382520
if worker.index() == 0 {
383-
if let Some(tick) = config.tick {
521+
if let Some(tick) = server_config.tick {
384522
let interval_end = Instant::now().duration_since(worker.timer()).coarsen(&tick);
385523
let at = worker.timer() + interval_end;
386524
server.scheduler.borrow_mut().event_at(at, scheduler::Event::Tick);
@@ -410,7 +548,7 @@ fn main() {
410548
}
411549
}
412550

413-
if !config.manual_advance {
551+
if !server_config.manual_advance {
414552
#[cfg(all(not(feature = "real-time"), not(feature = "bitemporal")))]
415553
let next = next_tx as u64;
416554
#[cfg(feature = "real-time")]
@@ -443,7 +581,7 @@ fn main() {
443581
worker.step_or_park(Some(delay));
444582
}
445583

446-
info!("[WORKER {}] shutting down", worker.index());
584+
info!("[W{}] shutting down", worker.index());
447585

448586
drop(sequencer);
449587

@@ -453,25 +591,3 @@ fn main() {
453591

454592
}).expect("Timely computation did not exit cleanly");
455593
}
456-
457-
fn options() -> Options {
458-
let mut opts = Options::new();
459-
opts.optopt("", "port", "server port", "PORT");
460-
opts.optopt(
461-
"",
462-
"tick",
463-
"advance domain at a regular interval",
464-
"SECONDS",
465-
);
466-
opts.optflag(
467-
"",
468-
"manual-advance",
469-
"forces clients to call AdvanceDomain explicitely",
470-
);
471-
opts.optflag("", "enable-logging", "enable log event sources");
472-
opts.optflag("", "enable-history", "enable historical queries");
473-
opts.optflag("", "enable-optimizer", "enable WCO queries");
474-
opts.optflag("", "enable-meta", "enable queries on the query graph");
475-
476-
opts
477-
}

0 commit comments

Comments
 (0)