Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit c77c600

Browse files
authored
Merge pull request #101 from TheTorProject/use_txtorcon_web_agent
Use txtorcon web agent
2 parents 97f93b6 + 8c902e6 commit c77c600

10 files changed

Lines changed: 42 additions & 393 deletions

File tree

bwscanner/attacher.py

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,9 @@
11
import txtorcon
22
from twisted.internet import defer, reactor, endpoints
3-
from txtorcon.interface import CircuitListenerMixin, IStreamAttacher, StreamListenerMixin
4-
from zope.interface import implementer
53

64
from bwscanner.logger import log
75

86

9-
@implementer(IStreamAttacher)
10-
class SOCKSClientStreamAttacher(CircuitListenerMixin, StreamListenerMixin):
11-
"""
12-
An attacher that builds a chosen path for a client identified by
13-
its source port and ip address.
14-
"""
15-
16-
def __init__(self, state):
17-
"""
18-
Instantiates a SOCKSClientStreamAttacher with a
19-
txtorcon.torstate.TorState instance.
20-
"""
21-
self.state = state
22-
self.waiting_circuits = {}
23-
self.expected_streams = {}
24-
self.state.add_stream_listener(self)
25-
self.state.add_circuit_listener(self)
26-
27-
def create_circuit(self, host, port, path, using_guards=False):
28-
"""
29-
Specify the path for streams created on a specific client
30-
SOCKS connection.
31-
32-
Returns a deferred that calls back with the constructed circuit
33-
or errs back with a failure instance.
34-
"""
35-
circ_deferred = defer.Deferred()
36-
key = (str(host), int(port))
37-
self.expected_streams[key] = circ_deferred
38-
39-
def add_to_waiting(circ):
40-
self.waiting_circuits[circ.id] = (circ, circ_deferred)
41-
return circ
42-
43-
circuit_build = self.state.build_circuit(
44-
path, using_guards=using_guards)
45-
circuit_build.addCallback(add_to_waiting)
46-
return circ_deferred
47-
48-
def attach_stream(self, stream, _):
49-
"""
50-
Attaches a NEW stream to the circuit created for it by matching the
51-
source address and source port of the SOCKS client connection to the
52-
corresponding circuit in the expected_streams dictionary.
53-
54-
Returns a deferred that calls back with the appropriate circuit,
55-
or None if there is no matching entry.
56-
57-
Note, Tor can be configured to leave streams unattached by setting
58-
the "__LeaveStreamsUnattached" torrc option to "1".
59-
"""
60-
try:
61-
key = (str(stream.source_addr), int(stream.source_port))
62-
return self.expected_streams.pop(key)
63-
except KeyError:
64-
# We didn't expect this stream, so let Tor handle it
65-
return None
66-
67-
def circuit_built(self, circuit):
68-
"""
69-
Calls back the deferred awaiting the circuit build with the
70-
circuit object.
71-
"""
72-
if circuit.purpose != "GENERAL":
73-
return
74-
try:
75-
(_, circ_deferred) = self.waiting_circuits.pop(circuit.id)
76-
circ_deferred.callback(circuit)
77-
except KeyError:
78-
pass
79-
80-
def circuit_failed(self, circuit, **kw):
81-
"""
82-
Calls the errback of the deferred waiting the circuit build if the
83-
circuit build failed. The failure reason is contained in the circuit
84-
object. The corresponding state in waiting_circuits is removed.
85-
86-
If the circuit failure did not correspond to a circuit requested
87-
by create_circuit, it is ignored.
88-
"""
89-
try:
90-
(circ, circ_deferred) = self.waiting_circuits.pop(circuit.id)
91-
circ_deferred.errback(circ)
92-
except KeyError:
93-
pass
94-
95-
96-
class StreamClosedListener(StreamListenerMixin):
97-
"""
98-
Closes the contained circuit if the listened stream closes.
99-
100-
This StreamListener is used to instruct Tor to close circuits
101-
immediately after a stream completes rather than wait for the
102-
circuit to time out.
103-
"""
104-
def __init__(self, circ):
105-
self.circ = circ
106-
107-
def stream_closed(self, *args, **kw):
108-
self.circ.close(ifUnused=True)
109-
110-
1117
def options_need_new_consensus(tor_config, new_options):
1128
"""
1139
Check if we need to wait for a new consensus after updating

bwscanner/fetcher.py

Lines changed: 10 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
import warnings
22
import hashlib
33

4-
from twisted.internet import interfaces, reactor, defer, protocol
4+
from twisted.internet import reactor, defer, protocol
55
from twisted.internet.endpoints import TCP4ClientEndpoint
6-
from twisted.web.client import (SchemeNotSupported, Agent, BrowserLikePolicyForHTTPS,
7-
ResponseDone, PotentialDataLoss, PartialDownloadError)
8-
from txsocksx.client import SOCKS5ClientFactory
9-
from txsocksx.tls import TLSWrapClientEndpoint
10-
from zope.interface import implementer
6+
from twisted.web.client import (ResponseDone, PotentialDataLoss, PartialDownloadError)
117

128
from bwscanner.logger import log
139

1410

11+
def fetch(tor_state, path, url):
12+
d = tor_state.build_circuit(path, False)
13+
sport = get_tor_socks_endpoint(tor_state)
14+
d.addCallback(lambda c: c.when_built())
15+
d.addCallback(lambda c: c.web_agent(reactor, sport))
16+
return d.addCallback(lambda a: a.request("GET", url))
17+
18+
1519
def get_tor_socks_endpoint(tor_state):
1620
proxy_endpoint = tor_state.protocol.get_conf("SocksPort")
1721

@@ -31,78 +35,6 @@ def extract_port_value(result):
3135
return proxy_endpoint
3236

3337

34-
@implementer(interfaces.IStreamClientEndpoint)
35-
class OnionRoutedTCPClientEndpoint(object):
36-
def __init__(self, host, port, state, path):
37-
"""
38-
@param reactor: An L{IReactorTCP} provider
39-
40-
@param host: A hostname, used when connecting
41-
@type host: str
42-
43-
@param port: The port number, used when connecting
44-
@type port: int
45-
46-
@param path: A list of relay identities.
47-
@type path: list
48-
49-
This endpoint will be routed through Tor over a circuit
50-
defined by path.
51-
"""
52-
self.host = host
53-
self.port = port
54-
self.path = path
55-
self.state = state
56-
57-
self.tor_socks_endpoint = get_tor_socks_endpoint(state)
58-
59-
def connect(self, protocol_factory):
60-
"""
61-
Implements L{IStreamClientEndpoint.connect} to connect via TCP, after
62-
SOCKS5 negotiation and Tor circuit construction is done.
63-
"""
64-
proxy_factory = SOCKS5ClientFactory(self.host, self.port, protocol_factory)
65-
self.tor_socks_endpoint.addCallback(lambda end: end.connect(proxy_factory))
66-
67-
def _create_circ(proto):
68-
hp = proto.transport.getHost()
69-
d = self.state._attacher.create_circuit(hp.host, hp.port, self.path)
70-
d.addErrback(proxy_factory.deferred.errback)
71-
return proxy_factory.deferred
72-
73-
return self.tor_socks_endpoint.addCallback(_create_circ)
74-
75-
76-
class OnionRoutedAgent(Agent):
77-
_tlsWrapper = TLSWrapClientEndpoint
78-
_policyForHTTPS = BrowserLikePolicyForHTTPS
79-
80-
def __init__(self, *args, **kw):
81-
self.path = kw.pop('path')
82-
self.state = kw.pop('state')
83-
super(OnionRoutedAgent, self).__init__(*args, **kw)
84-
85-
def _getEndpoint(self, parsedURI, host=None, port=None):
86-
try:
87-
host, port = parsedURI.host, parsedURI.port
88-
scheme = parsedURI.scheme
89-
except AttributeError:
90-
scheme = parsedURI
91-
if scheme not in ('http', 'https'):
92-
raise SchemeNotSupported('unsupported scheme', scheme)
93-
endpoint = OnionRoutedTCPClientEndpoint(host, port, self.state,
94-
self.path)
95-
if scheme == 'https':
96-
if hasattr(self, '_wrapContextFactory'):
97-
tls_policy = self._wrapContextFactory(host, port)
98-
elif hasattr(self, '_policyForHTTPS'):
99-
tls_policy = self._policyForHTTPS().creatorForNetloc(host, port)
100-
else:
101-
raise NotImplementedError("Cannot create a TLS validation policy.")
102-
endpoint = self._tlsWrapper(tls_policy, endpoint)
103-
return endpoint
104-
105-
10638
class hashingReadBodyProtocol(protocol.Protocol):
10739
"""
10840
Protocol that collects data sent to it and hashes it.

bwscanner/measurement.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
from twisted.internet import defer
77

88
from bwscanner.logger import log
9-
from bwscanner.attacher import SOCKSClientStreamAttacher
109
from bwscanner.circuit import TwoHop
11-
from bwscanner.fetcher import OnionRoutedAgent, hashingReadBody
10+
from bwscanner.fetcher import hashingReadBody, fetch
1211
from bwscanner.writer import ResultSink
1312

1413
# defer.setDebugging(True)
@@ -31,6 +30,7 @@ def __init__(self, state, clock, measurement_dir, **kwargs):
3130
this_partition: which partition of circuit we will process
3231
"""
3332
self.state = state
33+
self._socks = None
3434
self.clock = clock
3535
self.measurement_dir = measurement_dir
3636
self.partitions = kwargs.get('partitions', 1)
@@ -55,9 +55,6 @@ def __init__(self, state, clock, measurement_dir, **kwargs):
5555

5656
self.result_sink = ResultSink(self.measurement_dir, chunk_size=10)
5757

58-
# Add a stream attacher
59-
self.state.set_attacher(SOCKSClientStreamAttacher(self.state), clock)
60-
6158
def now(self):
6259
return time.time()
6360

@@ -162,14 +159,13 @@ def gotResult(result):
162159
return result
163160
deferred.addBoth(gotResult)
164161

165-
agent = OnionRoutedAgent(self.clock, path=path, state=self.state)
166-
request = agent.request("GET", url)
167-
request.addCallback(hashingReadBody) # returns a readBody Deferred
168-
timeoutDeferred(request, self.request_timeout)
169-
request.addCallbacks(get_circuit_bw)
170-
request.addErrback(circ_failure)
171-
request.addCallback(self.result_sink.send)
172-
return request
162+
d = fetch(self.state, path, url)
163+
d.addCallback(hashingReadBody)
164+
timeoutDeferred(d, self.request_timeout)
165+
d.addCallbacks(get_circuit_bw)
166+
d.addErrback(circ_failure)
167+
d.addCallback(self.result_sink.send)
168+
return d
173169

174170
@defer.inlineCallbacks
175171
def get_r_ns_bw(self, router):

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ service-identity==16.0.0
44
stem>=1.4.0
55
Twisted==16.2.0
66
txsocksx==1.15.0.2
7-
txtorcon==0.19.3
7+
txtorcon==0.20.0

scripts/exitip.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
import json
33
import re
44
import sys
5-
from bwscanner.attacher import SOCKSClientStreamAttacher, start_tor
6-
from bwscanner.circuit import ExitScan
7-
from bwscanner.fetcher import OnionRoutedAgent
5+
from bwscanner.attacher import start_tor
86
from twisted.internet import defer, reactor, task
97
from twisted.python import log
108
from twisted.web.client import readBody
@@ -57,20 +55,20 @@ def pop(circuits):
5755
def shutdown(ignore):
5856
reactor.stop()
5957

60-
def add_attacher(state):
61-
state.set_attacher(SOCKSClientStreamAttacher(state), reactor)
62-
return state
63-
6458
def setup_failed(failure):
6559
log.err(failure)
6660

6761
def save_results(result, outfile):
6862
outfile.write(json.dumps(dict([r[1] for r in result if r[1] != None])))
6963

64+
@defer.inlineCallbacks
65+
def socks(state):
66+
s = yield get_tor_socks_endpoint(state)
67+
defer.returnValue(s)
68+
7069
def main():
7170
log.startLogging(sys.stdout)
7271
tor = start_tor(TorConfig())
73-
tor.addCallback(add_attacher)
7472
tor.addCallback(run_scan)
7573
tor.addErrback(log.err)
7674
tor.addBoth(shutdown)

test/template.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from twisted.trial import unittest
66

77
from bwscanner import circuit
8-
from bwscanner.attacher import SOCKSClientStreamAttacher, connect_to_tor
8+
from bwscanner.attacher import connect_to_tor
99

1010

1111
class TorTestCase(unittest.TestCase):
@@ -18,9 +18,6 @@ def setUp(self):
1818
circuit_build_timeout=30,
1919
)
2020

21-
self.attacher = SOCKSClientStreamAttacher(self.tor_state)
22-
yield self.tor_state.set_attacher(self.attacher, reactor)
23-
2421
@property
2522
def routers(self):
2623
return list(set(self.tor_state.routers.values()))

0 commit comments

Comments
 (0)