This commit is contained in:
Alexander Munch-Hansen 2019-10-02 19:38:54 +02:00
parent 2f77e50fad
commit 65b1f6fecf

186
week6.py
View File

@ -2,11 +2,53 @@ from __future__ import annotations
import hashlib
import itertools
import math
import random
import secrets
from typing import List
from secrets import SystemRandom
from typing import List, Union
from .week1 import BloodType, blood_cell_compatibility_lookup
from .week4 import gen_prime
class ElGamal:
def __init__(self, g, q, p):
self.gen_ = g
self.order = q
self.p = p
def gen_key(self):
key = SystemRandom().randint(1, self.order)
while math.gcd(self.order, key) != 1:
key = SystemRandom().randint(1, self.order)
return key
def gen(self, sk):
h = pow(self.gen_, sk, self.order)
return (self.gen_, h)
def enc(self, m, pk):
# sample random r \in Zq
r = SystemRandom().randint(1, self.order)
g, h = pk
s = pow(h, r, self.order)
p = pow(g, r, self.order)
tmp = int.from_bytes(m, "big")
c = s * tmp
return c, p
def dec(self, c, sk):
c1, c2 = c
h = pow(c2, sk, self.order)
m = c1 // h
return m.to_bytes(16, "big")
def ogen(self):
s = SystemRandom().randint(1, self.order)
h = pow(s, 2, self.order)
return self.gen_, h
from week1 import BloodType
def sha256(b: bytes) -> bytes:
@ -14,15 +56,15 @@ def sha256(b: bytes) -> bytes:
def rand_bytes():
return secrets.SystemRandom().getrandbits(128).to_bytes(16, "big")
return SystemRandom().getrandbits(128).to_bytes(16, "big")
def xor_bytes(a: bytes, b: bytes, k=256) -> bytes:
def xor_bytes(a: bytes, b: bytes, k=32) -> bytes:
return (int.from_bytes(a, "big") ^ int.from_bytes(b, "big")).to_bytes(k, "big")
class Gate:
def __init__(self, left: Gate, right: Gate, index: int) -> None:
def __init__(self, left: Union[Gate, InputWire], right: Union[Gate, InputWire], index: int) -> None:
self.left = left
self.right = right
self.i = index
@ -36,13 +78,26 @@ class Gate:
for a, b in itertools.product((0, 1), repeat=2):
c_prime[(a, b)] = xor_bytes(
sha256(self.left.k[a] + self.right.k[b] + self.i.to_bytes(1, "big")),
self.k[not a * b] + bytes(128 // 8)
self.k[self.f(a, b)] + bytes(16)
)
pi = list(itertools.product((0, 1), repeat=2))
random.shuffle(pi)
self.c = {i: c_prime[p]
for i, p in enumerate(pi)}
def f(self, a, b):
raise NotImplemented
class ImplyGate(Gate):
def f(self, a, b):
return a >= b
class AndGate(Gate):
def f(self, a, b):
return a * b
class InputWire:
def __init__(self, index) -> None:
@ -59,14 +114,18 @@ class Circuit:
self.input_wires = input_wires
self.gates = gates
@property
def d(self):
return self.gates[-1].k
def evaluate(self, x: List[bytes]) -> bytes:
for i, xi in enumerate(x):
self.input_wires[i].output = xi
for i, input_wire in enumerate(self.input_wires):
input_wire.output = x[i]
for gate in self.gates:
for j in range(4):
xor = xor_bytes(
sha256(gate.left.output + gate.right.output + gate.i),
sha256(gate.left.output + gate.right.output + gate.i.to_bytes(1, "big")),
gate.c[j]
)
k_prime, tau = xor[:16], xor[16:]
@ -82,19 +141,110 @@ def encode(e: List[InputWire], x: List[int]) -> List[bytes]:
class Alice:
pass
def __init__(self, ra, rb, rs, elgamal):
self.elgamal = elgamal
self.sks = None
self.input = [ra, rb, rs]
self.keys = None
def send_pks(self):
pks = []
self.sks = []
for idx, input_ in enumerate(self.input):
sk = self.elgamal.gen_key()
pk = self.elgamal.gen(sk)
self.sks.append(sk)
fake_pk = self.elgamal.ogen()
pk_tuple = [fake_pk]
pk_tuple.insert(input_, pk)
pks.append(pk_tuple)
return pks
def retrieve(self, circuit, bob_keys, ciphers):
self.keys = []
for idx, sk in enumerate(self.sks):
self.keys.append(self.elgamal.dec(ciphers[idx][self.input[idx]], sk))
all_keys = self.keys + bob_keys
res = circuit.evaluate(all_keys)
if circuit.d[0] == res:
return 0
if circuit.d[1] == res:
return 1
raise Exception("Fuck you")
class Bob:
pass
def __init__(self, da, db, ds, elgamal):
input_wire1 = InputWire(0)
input_wire2 = InputWire(1)
input_wire3 = InputWire(2)
input_wire4 = InputWire(3)
input_wire5 = InputWire(4)
input_wire6 = InputWire(5)
impl_gate_1 = ImplyGate(input_wire1, input_wire4, 6)
impl_gate_2 = ImplyGate(input_wire2, input_wire5, 7)
impl_gate_3 = ImplyGate(input_wire3, input_wire6, 8)
and_gate_1 = AndGate(impl_gate_1, impl_gate_2, 9)
and_gate_2 = AndGate(and_gate_1, impl_gate_3, 10)
self.circuit = Circuit(
input_wires=[input_wire1, input_wire2, input_wire3, input_wire4, input_wire5, input_wire6],
gates=[impl_gate_1, impl_gate_2, impl_gate_3, and_gate_1, and_gate_2]
)
self.own_keys = encode([input_wire4, input_wire5, input_wire6], [da, db, ds])
self.key_set = [x.k.values() for x in [input_wire1, input_wire2, input_wire3]]
self.elgamal = elgamal
self.pks = None
def receive_pks(self, pks):
self.pks = pks
def transfer_messages(self):
ciphers = []
for idx, (k0, k1) in enumerate(self.key_set):
pk0, pk1 = self.pks[idx]
c0 = self.elgamal.enc(k0, pk0)
c1 = self.elgamal.enc(k1, pk1)
ciphers.append((c0, c1))
return self.circuit, self.own_keys, ciphers
def run(donor: BloodType, recipient: BloodType):
m1 = Alice.choose(x)
m2 = Bob.transfer(y, m2)
z = Alice.retrieve(m2)
return z
def run(da, db, ds, ra, rb, rs):
p = gen_prime(256)
q = 2 * p + 1
g = SystemRandom().randint(2, q)
elgamal = ElGamal(g, q, p)
alice = Alice(ra=ra, rb=rb, rs=rs, elgamal=elgamal)
bob = Bob(da=da, db=db, ds=ds, elgamal=elgamal)
bob.receive_pks(alice.send_pks())
pls = alice.retrieve(*bob.transfer_messages())
return pls
def main():
run(donor=BloodType.A_NEGATIVE, recipient=BloodType.B_POSITIVE)
green = 0
red = 0
for i, recipient in enumerate(BloodType):
for j, donor in enumerate(BloodType):
z = run(*donor.value, *recipient.value)
lookup = blood_cell_compatibility_lookup(recipient, donor)
if lookup == z:
green += 1
else:
print(f"'{BloodType(donor).name} -> {BloodType(recipient).name}' should be {lookup}.")
red += 1
print("Green:", green)
print("Red :", red)
# run(donor=BloodType.A_NEGATIVE, recipient=BloodType.B_POSITIVE)