Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 14d4ef5

Browse files
authored
Merge pull request #67 from TheTorProject/tor-conn-fixes
Refactor Tor connection and configuration code
2 parents ff86b30 + 78b0c8c commit 14d4ef5

11 files changed

Lines changed: 121 additions & 138 deletions

bwscanner/attacher.py

Lines changed: 73 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,9 @@
1-
import sys
2-
import itertools
3-
4-
from twisted.internet import defer, reactor
1+
import txtorcon
2+
from twisted.internet import defer, reactor, endpoints
53
from txtorcon.interface import CircuitListenerMixin, IStreamAttacher, StreamListenerMixin
6-
from txtorcon import TorState, launch_tor
7-
from txtorcon.util import available_tcp_port
84
from zope.interface import implementer
95

10-
11-
FETCH_ALL_DESCRIPTOR_OPTIONS = {
12-
'UseMicroDescriptors': 0,
13-
'FetchUselessDescriptors': 1,
14-
'FetchDirInfoEarly': 1,
15-
'FetchDirInfoExtraEarly': 1,
16-
}
6+
from bwscanner.logger import log
177

188

199
@implementer(IStreamAttacher)
@@ -118,47 +108,80 @@ def stream_closed(self, *args, **kw):
118108
self.circ.close(ifUnused=True)
119109

120110

121-
def start_tor(config):
111+
def options_need_new_consensus(tor_config, new_options):
122112
"""
123-
Launches tor with random TCP ports chosen for SocksPort and ControlPort,
124-
and other options specified by a txtorcon.torconfig.TorConfig instance.
125-
126-
Returns a deferred that calls back with a txtorcon.torstate.TorState
127-
instance.
113+
Check if we need to wait for a new consensus after updating
114+
the Tor config with the new options.
128115
"""
129-
def get_random_tor_ports():
130-
d2 = available_tcp_port(reactor)
131-
d2.addCallback(lambda port: config.__setattr__('SocksPort', port))
132-
d2.addCallback(lambda _: available_tcp_port(reactor))
133-
d2.addCallback(lambda port: config.__setattr__('ControlPort', port))
134-
return d2
116+
if "UseMicroDescriptors" in new_options:
117+
if tor_config.UseMicroDescriptors != new_options["UseMicroDescriptors"]:
118+
log.debug("Changing UseMicroDescriptors from {current} to {new}.",
119+
current=tor_config.UseMicroDescriptors,
120+
new=new_options["UseMicroDescriptors"])
121+
return True
122+
return False
123+
135124

136-
def launch_and_get_state(ignore):
137-
d2 = launch_tor(config, reactor, stdout=sys.stdout)
138-
d2.addCallback(lambda tpp: TorState(tpp.tor_protocol).post_bootstrap)
139-
return d2
140-
return get_random_tor_ports().addCallback(launch_and_get_state)
125+
def wait_for_newconsensus(tor_state):
126+
got_consensus = defer.Deferred()
141127

128+
def got_newconsensus(event):
129+
log.debug("Got NEWCONSENSUS event: {event}", event=event)
130+
got_consensus.callback(event)
131+
tor_state.protocol.remove_event_listener('NEWCONSENSUS', got_newconsensus)
142132

143-
def update_tor_config(tor, config):
133+
tor_state.protocol.add_event_listener('NEWCONSENSUS', got_newconsensus)
134+
return got_consensus
135+
136+
137+
@defer.inlineCallbacks
138+
def connect_to_tor(launch_tor, circuit_build_timeout, control_port=None,
139+
tor_overrides=None):
144140
"""
145-
Update the Tor config from a dict of config key: value pairs.
141+
Launch or connect to a Tor instance
142+
143+
Configure Tor with the passed options and return a Deferred
146144
"""
147-
config_pairs = [(key, value) for key, value in config.items()]
148-
d = tor.protocol.set_conf(*itertools.chain.from_iterable(config_pairs))
149-
return d.addCallback(lambda result: tor)
150-
151-
152-
def setconf_singleport_exit(tor):
153-
port = available_tcp_port(reactor)
154-
155-
def add_single_port_exit(port):
156-
tor.protocol.set_conf('PublishServerDescriptor', '0',
157-
'PortForwarding', '1',
158-
'AssumeReachable', '1',
159-
'ClientRejectInternalAddresses', '0',
160-
'OrPort', 'auto',
161-
'ExitPolicyRejectPrivate', '0',
162-
'ExitPolicy', 'accept 127.0.0.1:{}, reject *:*'.format(port))
163-
return port.addCallback(add_single_port_exit).addCallback(
164-
lambda ign: tor.routers[tor.protocol.get_info("fingerprint")])
145+
# Options for spawned or running Tor to load the correct descriptors.
146+
tor_options = {
147+
'LearnCircuitBuildTimeout': 0, # Disable adaptive circuit timeouts.
148+
'CircuitBuildTimeout': circuit_build_timeout,
149+
'UseEntryGuards': 0, # Disable UseEntryGuards to avoid PathBias warnings.
150+
'UseMicroDescriptors': 0,
151+
'FetchUselessDescriptors': 1,
152+
'FetchDirInfoEarly': 1,
153+
'FetchDirInfoExtraEarly': 1,
154+
}
155+
156+
if tor_overrides:
157+
tor_options.update(tor_overrides)
158+
159+
if launch_tor:
160+
log.info("Spawning a new Tor instance.")
161+
# TODO: Pass in data_dir directory so consensus can be cached
162+
tor = yield txtorcon.launch(reactor)
163+
else:
164+
log.info("Trying to connect to a running Tor instance.")
165+
if control_port:
166+
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", control_port)
167+
else:
168+
endpoint = None
169+
tor = yield txtorcon.connect(reactor, endpoint)
170+
171+
# Get Tor state first to avoid a race conditions where CONF_CHANGED
172+
# messages are received while Txtorcon is reading the consensus.
173+
tor_state = yield tor.create_state()
174+
175+
# Get current TorConfig object
176+
tor_config = yield tor.get_config()
177+
wait_for_consensus = options_need_new_consensus(tor_config, tor_options)
178+
179+
# Update Tor config options from dictionary
180+
for key, value in tor_options.items():
181+
setattr(tor_config, key, value)
182+
yield tor_config.save() # Send updated options to Tor
183+
184+
if wait_for_consensus:
185+
yield wait_for_newconsensus(tor_state)
186+
187+
defer.returnValue(tor_state)

bwscanner/circuit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def random_path_to_exit(exit_relay, relays):
3131
class CircuitGenerator(object):
3232
def __init__(self, state):
3333
self.state = state
34-
self.relays = list(set(state.routers.values()))
34+
self.relays = list(set(r for r in state.routers.values() if r))
3535
self.exits = [relay for relay in self.relays if is_valid_exit(relay)]
3636

3737
def __iter__(self):

bwscanner/fetcher.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from bwscanner.logger import log
1313

1414

15-
def get_orport_endpoint(tor_state):
15+
def get_tor_socks_endpoint(tor_state):
1616
proxy_endpoint = tor_state.protocol.get_conf("SocksPort")
1717

1818
def extract_port_value(result):
@@ -54,23 +54,23 @@ def __init__(self, host, port, state, path):
5454
self.path = path
5555
self.state = state
5656

57-
self.or_endpoint = get_orport_endpoint(state)
57+
self.tor_socks_endpoint = get_tor_socks_endpoint(state)
5858

5959
def connect(self, protocol_factory):
6060
"""
6161
Implements L{IStreamClientEndpoint.connect} to connect via TCP, after
6262
SOCKS5 negotiation and Tor circuit construction is done.
6363
"""
6464
proxy_factory = SOCKS5ClientFactory(self.host, self.port, protocol_factory)
65-
self.or_endpoint.addCallback(lambda end: end.connect(proxy_factory))
65+
self.tor_socks_endpoint.addCallback(lambda end: end.connect(proxy_factory))
6666

6767
def _create_circ(proto):
6868
hp = proto.transport.getHost()
69-
d = self.state.attacher.create_circuit(hp.host, hp.port, self.path)
69+
d = self.state._attacher.create_circuit(hp.host, hp.port, self.path)
7070
d.addErrback(proxy_factory.deferred.errback)
7171
return proxy_factory.deferred
7272

73-
return self.or_endpoint.addCallback(_create_circ)
73+
return self.tor_socks_endpoint.addCallback(_create_circ)
7474

7575

7676
class OnionRoutedAgent(Agent):

bwscanner/scanner.py

Lines changed: 14 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,16 @@
44

55
import click
66
from twisted.internet import reactor
7-
from txtorcon import build_local_tor_connection, TorConfig
87

8+
from bwscanner.attacher import connect_to_tor
99
from bwscanner.logger import setup_logging, log
10-
from bwscanner.attacher import start_tor, update_tor_config, FETCH_ALL_DESCRIPTOR_OPTIONS
1110
from bwscanner.measurement import BwScan
1211
from bwscanner.aggregate import write_aggregate_data
1312

1413

1514
BWSCAN_VERSION = '0.0.1'
1615

1716

18-
def connect_to_tor(launch_tor, circuit_build_timeout, circuit_idle_timeout):
19-
"""
20-
Launch or connect to a Tor instance
21-
22-
Configure Tor with the passed options and return a Deferred
23-
"""
24-
# Options for spawned or running Tor to load the correct descriptors.
25-
tor_options = {
26-
'LearnCircuitBuildTimeout': 0, # Disable adaptive circuit timeouts.
27-
'CircuitBuildTimeout': circuit_build_timeout,
28-
'CircuitIdleTimeout': circuit_idle_timeout,
29-
'UseEntryGuards': 0, # Disable UseEntryGuards to avoid PathBias warnings.
30-
}
31-
32-
def tor_status(tor):
33-
log.info("Connected successfully to Tor.")
34-
return tor
35-
36-
if launch_tor:
37-
log.info("Spawning a new Tor instance.")
38-
c = TorConfig()
39-
# Update Tor config before launching a new Tor.
40-
c.config.update(tor_options)
41-
c.config.update(FETCH_ALL_DESCRIPTOR_OPTIONS)
42-
tor = start_tor(c)
43-
44-
else:
45-
log.info("Trying to connect to a running Tor instance.")
46-
tor = build_local_tor_connection(reactor)
47-
# Update the Tor config on a running Tor.
48-
tor.addCallback(update_tor_config, tor_options)
49-
tor.addCallback(update_tor_config, FETCH_ALL_DESCRIPTOR_OPTIONS)
50-
51-
tor.addCallback(tor_status)
52-
return tor
53-
54-
5517
class ScanInstance(object):
5618
"""
5719
Store the configuration and state for the CLI tool.
@@ -80,11 +42,9 @@ def __repr__(self):
8042
help='Launch Tor or try to connect to an existing Tor instance.')
8143
@click.option('--circuit-build-timeout', default=20,
8244
help='Option passed when launching Tor.')
83-
@click.option('--circuit-idle-timeout', default=20,
84-
help='Option passed when launching Tor.')
8545
@click.version_option(BWSCAN_VERSION)
8646
@click.pass_context
87-
def cli(ctx, data_dir, loglevel, logfile, launch_tor, circuit_build_timeout, circuit_idle_timeout):
47+
def cli(ctx, data_dir, loglevel, logfile, launch_tor, circuit_build_timeout):
8848
"""
8949
The bwscan tool measures Tor relays and calculates their bandwidth. These
9050
bandwidth measurements can then be aggregate to create the bandwidth
@@ -98,7 +58,7 @@ def cli(ctx, data_dir, loglevel, logfile, launch_tor, circuit_build_timeout, cir
9858
os.makedirs(ctx.obj.measurement_dir)
9959

10060
# Create a connection to a Tor instance
101-
ctx.obj.tor = connect_to_tor(launch_tor, circuit_build_timeout, circuit_idle_timeout)
61+
ctx.obj.tor_state = connect_to_tor(launch_tor, circuit_build_timeout)
10262

10363
# Set up the logger to only output log lines of level `loglevel` and above.
10464
setup_logging(log_level=loglevel, log_name=logfile)
@@ -131,12 +91,14 @@ def rename_finished_scan(deferred):
13191
click.echo(deferred)
13292
os.rename(scan_data_dir, os.path.join(scan.measurement_dir, scan_time))
13393

134-
scan.tor.addCallback(BwScan, reactor, scan_data_dir, request_timeout=timeout,
135-
request_limit=request_limit, partitions=partitions,
136-
this_partition=current_partition)
137-
scan.tor.addCallback(lambda scanner: scanner.run_scan())
138-
scan.tor.addCallback(lambda _: reactor.stop())
139-
scan.tor.addCallback(rename_finished_scan)
94+
scan.tor_state.addCallback(BwScan, reactor, scan_data_dir,
95+
request_timeout=timeout,
96+
request_limit=request_limit,
97+
partitions=partitions,
98+
this_partition=current_partition)
99+
scan.tor_state.addCallback(lambda scanner: scanner.run_scan())
100+
scan.tor_state.addCallback(lambda _: reactor.stop())
101+
scan.tor_state.addCallback(rename_finished_scan)
140102

141103
reactor.run()
142104

@@ -192,7 +154,7 @@ def aggregate(scan, scan_name, previous):
192154
scan_data_dirs = [os.path.join(scan.measurement_dir, name) for name in recent_scan_names]
193155
log.info("Aggregating data from past {count} scans.", count=len(scan_data_dirs))
194156

195-
scan.tor.addCallback(lambda tor: write_aggregate_data(tor, scan_data_dirs))
196-
scan.tor.addErrback(lambda failure: log.failure("Unexpected error"))
197-
scan.tor.addCallback(lambda _: reactor.stop())
157+
scan.tor_state.addCallback(lambda tor_state: write_aggregate_data(tor_state, scan_data_dirs))
158+
scan.tor_state.addErrback(lambda failure: log.failure("Unexpected error"))
159+
scan.tor_state.addCallback(lambda _: reactor.stop())
198160
reactor.run()

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ service-identity==16.0.0
44
stem>=1.4.0
55
Twisted==16.2.0
66
txsocksx==1.15.0.2
7-
txtorcon==0.15.0
7+
txtorcon==0.19.3

test/template.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,28 @@
22
import random
33

44
from twisted.internet import defer, reactor
5-
from twisted.internet.endpoints import TCP4ClientEndpoint
65
from twisted.trial import unittest
7-
from txtorcon.torstate import build_tor_connection
86

97
from bwscanner import circuit
10-
from bwscanner.attacher import SOCKSClientStreamAttacher
8+
from bwscanner.attacher import SOCKSClientStreamAttacher, connect_to_tor
119

1210

1311
class TorTestCase(unittest.TestCase):
1412

1513
@defer.inlineCallbacks
1614
def setUp(self):
17-
self.tor = yield build_tor_connection(
18-
TCP4ClientEndpoint(reactor, 'localhost', int(
19-
os.environ.get('CHUTNEY_CONTROL_PORT'))))
15+
self.tor_state = yield connect_to_tor(
16+
launch_tor=False,
17+
control_port=int(os.environ.get('CHUTNEY_CONTROL_PORT')),
18+
circuit_build_timeout=30,
19+
)
2020

21-
self.attacher = SOCKSClientStreamAttacher(self.tor)
22-
yield self.tor.set_attacher(self.attacher, reactor)
21+
self.attacher = SOCKSClientStreamAttacher(self.tor_state)
22+
yield self.tor_state.set_attacher(self.attacher, reactor)
2323

2424
@property
2525
def routers(self):
26-
return list(set(self.tor.routers.values()))
26+
return list(set(self.tor_state.routers.values()))
2727

2828
@property
2929
def exits(self):
@@ -35,7 +35,7 @@ def random_path(self):
3535

3636
@defer.inlineCallbacks
3737
def tearDown(self):
38-
yield self.tor.set_attacher(None, reactor)
39-
yield self.tor.protocol.quit()
38+
yield self.tor_state.set_attacher(None, reactor)
39+
yield self.tor_state.protocol.quit()
4040
# seems to leave dirty reactor otherwise?
41-
yield self.tor.protocol.transport.loseConnection()
41+
yield self.tor_state.protocol.transport.loseConnection()

test/test_attacher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class TestSOCKSClientStreamAttacher(TorTestCase):
5454
def setUp(self):
5555
yield super(TestSOCKSClientStreamAttacher, self).setUp()
5656
# do not attach circuits automatically
57-
yield self.tor.set_attacher(None, reactor)
57+
yield self.tor_state.set_attacher(None, reactor)
5858

5959
@defer.inlineCallbacks
6060
def test_create_circuit(self):

test/test_circuit.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ def test_exit_scan(self):
88
all_exits = set(self.exits)
99
num_circuits = 0
1010
seen = set()
11-
for circuit in ExitScan(self.tor):
11+
for circuit in ExitScan(self.tor_state):
1212
assert len(circuit) == 3
1313
assert 'exit' in circuit[-1].flags
1414
seen.add(circuit[-1])
@@ -20,7 +20,7 @@ def test_two_hop(self):
2020
all_r = set(self.routers)
2121
num_circuits = 0
2222
seen = set()
23-
for circuit in TwoHop(self.tor):
23+
for circuit in TwoHop(self.tor_state):
2424
assert len(circuit) == 2
2525
assert 'exit' in circuit[-1].flags
2626
num_circuits = num_circuits + 1

0 commit comments

Comments
 (0)