self.stubs.Set(crypto, 'ssh_encrypt_text', fake_encrypt_text)
- expected_data = ('\n# The following ssh key was injected by '
- 'Nova\nssh-rsa fake_keydata\n')
+ expected_data = (b'\n# The following ssh key was injected by '
+ b'Nova\nssh-rsa fake_keydata\n')
- injected_files = [('/root/.ssh/authorized_keys', expected_data)]
+ injected_files = [(b'/root/.ssh/authorized_keys', expected_data)]
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
key_data='ssh-rsa fake_keydata')
self.stubs.Set(crypto, 'ssh_encrypt_text', fake_encrypt_text)
- expected_data = ('\n# The following ssh key was injected by '
- 'Nova\nssh-dsa fake_keydata\n')
+ expected_data = (b'\n# The following ssh key was injected by '
+ b'Nova\nssh-dsa fake_keydata\n')
- injected_files = [('/root/.ssh/authorized_keys', expected_data)]
+ injected_files = [(b'/root/.ssh/authorized_keys', expected_data)]
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
key_data='ssh-dsa fake_keydata')
self.stubs.Set(stubs.FakeSessionForVMTests,
'_plugin_agent_inject_file', fake_inject_file)
- injected_files = [('/tmp/foo', 'foobar')]
+ injected_files = [(b'/tmp/foo', b'foobar')]
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
injected_files=injected_files)
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p icmp'
' -s 192.168.11.0/24')
- self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
+ match_rules = [rule for rule in self._out_rules if regex.match(rule)]
+ self.assertGreater(len(match_rules), 0,
"ICMP acceptance rule wasn't added")
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p icmp -m icmp'
' --icmp-type 8 -s 192.168.11.0/24')
- self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
+ match_rules = [rule for rule in self._out_rules if regex.match(rule)]
+ self.assertGreater(len(match_rules), 0,
"ICMP Echo Request acceptance rule wasn't added")
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p tcp --dport 80:81'
' -s 192.168.10.0/24')
- self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
+ match_rules = [rule for rule in self._out_rules if regex.match(rule)]
+ self.assertGreater(len(match_rules), 0,
"TCP port 80/81 acceptance rule wasn't added")
def test_static_filters(self):
continue
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p tcp'
' --dport 80:81 -s %s' % ip['address'])
- self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
+ match_rules = [rule for rule in self._out_rules
+ if regex.match(rule)]
+ self.assertGreater(len(match_rules), 0,
"TCP port 80/81 acceptance rule wasn't added")
db.instance_destroy(admin_ctxt, instance_ref['uuid'])
self.fw.refresh_security_group_rules(secgroup)
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p udp --dport 200:299'
' -s 192.168.99.0/24')
- self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
+ match_rules = [rule for rule in self._out_rules if regex.match(rule)]
+ self.assertGreater(len(match_rules), 0,
"Rules were not updated properly. "
"The rule for UDP acceptance is missing")
# License for the specific language governing permissions and limitations
# under the License.
-import base64
import binascii
from distutils import version
import os
import time
from oslo_log import log as logging
+from oslo_serialization import base64
from oslo_serialization import jsonutils
+from oslo_utils import encodeutils
from oslo_utils import strutils
from oslo_utils import uuidutils
ctxt = context.get_admin_context()
enc = crypto.ssh_encrypt_text(sshkey, new_pass)
self.instance.system_metadata.update(
- password.convert_password(ctxt, base64.b64encode(enc)))
+ password.convert_password(ctxt, base64.encode_as_text(enc)))
self.instance.save()
def set_admin_password(self, new_pass):
LOG.debug('Injecting file path: %r', path, instance=self.instance)
# Files/paths must be base64-encoded for transmission to agent
- b64_path = base64.b64encode(path.encode('utf-8'))
- b64_contents = base64.b64encode(contents.encode('utf-8'))
+ b64_path = base64.encode_as_bytes(path)
+ b64_contents = base64.encode_as_bytes(contents)
args = {'b64_path': b64_path, 'b64_contents': b64_contents}
return self._call_agent('inject_file', args)
'pass:%s' % self._shared, '-nosalt']
if decrypt:
cmd.append('-d')
- out, err = utils.execute(*cmd, process_input=text)
+ out, err = utils.execute(*cmd,
+ process_input=encodeutils.safe_encode(text))
if err:
raise RuntimeError(_('OpenSSL error: %s') % err)
return out